Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions src/claude_agent_sdk/_internal/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ async def process_query(
sdk_mcp_servers[name] = config["instance"] # type: ignore[typeddict-item]

# Create Query to handle control protocol
is_streaming = not isinstance(prompt, str)
# Force streaming mode if SDK MCP servers are present (they require bidirectional communication)
is_streaming = not isinstance(prompt, str) or bool(sdk_mcp_servers)
query = Query(
transport=chosen_transport,
is_streaming_mode=is_streaming,
Expand All @@ -109,11 +110,12 @@ async def process_query(
if is_streaming:
await query.initialize()

# Stream input if it's an AsyncIterable
if isinstance(prompt, AsyncIterable) and query._tg:
# Start streaming in background
# Create a task that will run in the background
query._tg.start_soon(query.stream_input, prompt)
# Stream input if in streaming mode
if query._tg:
# Use the (possibly converted) prompt from transport
stream_prompt = getattr(chosen_transport, "_prompt", prompt)
if isinstance(stream_prompt, AsyncIterable):
query._tg.start_soon(query.stream_input, stream_prompt)
# For string prompts, the prompt is already passed via CLI args

# Yield parsed messages
Expand Down
20 changes: 18 additions & 2 deletions src/claude_agent_sdk/_internal/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
ListToolsRequest,
)

from .._errors import CLIConnectionError
from ..types import (
PermissionResultAllow,
PermissionResultDeny,
Expand Down Expand Up @@ -322,7 +323,14 @@ async def _handle_control_request(self, request: SDKControlRequest) -> None:
"response": response_data,
},
}
await self.transport.write(json.dumps(success_response) + "\n")
try:
await self.transport.write(json.dumps(success_response) + "\n")
except CLIConnectionError:
logger.debug(
"Transport closed before sending control response for %s (request_id=%s)",
subtype,
request_id,
)

except Exception as e:
# Send error response
Expand All @@ -334,7 +342,15 @@ async def _handle_control_request(self, request: SDKControlRequest) -> None:
"error": str(e),
},
}
await self.transport.write(json.dumps(error_response) + "\n")
try:
await self.transport.write(json.dumps(error_response) + "\n")
except CLIConnectionError:
logger.debug(
"Transport closed before sending error response for %s (request_id=%s): %s",
subtype,
request_id,
e,
)

async def _send_control_request(
self, request: dict[str, Any], timeout: float = 60.0
Expand Down
60 changes: 58 additions & 2 deletions src/claude_agent_sdk/_internal/transport/subprocess_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,27 @@
_CMD_LENGTH_LIMIT = 8000 if platform.system() == "Windows" else 100000


async def _string_to_async_iterable(prompt: str) -> AsyncIterator[dict[str, Any]]:
"""Convert a string prompt to an async iterable for streaming mode.

When SDK MCP servers are present, we need streaming mode for bidirectional
communication. This helper converts a string prompt to the expected
stream-json format.

Args:
prompt: The string prompt to convert

Yields:
A single user message dict in stream-json format
"""
yield {
"type": "user",
"message": {"role": "user", "content": prompt},
"parent_tool_use_id": None,
"session_id": "default",
}


class SubprocessCLITransport(Transport):
"""Subprocess transport using Claude Code CLI."""

Expand All @@ -45,8 +66,22 @@ def __init__(
prompt: str | AsyncIterable[dict[str, Any]],
options: ClaudeAgentOptions,
):
self._prompt = prompt
self._is_streaming = not isinstance(prompt, str)
# Check if SDK MCP servers are present - they require streaming mode
# for bidirectional communication
has_sdk_mcp = self._has_sdk_mcp_servers(options)

# Determine streaming mode: either explicit AsyncIterable or
# forced by SDK MCP servers presence
if isinstance(prompt, str) and has_sdk_mcp:
# Convert string prompt to async iterable for SDK MCP support
self._prompt: str | AsyncIterable[dict[str, Any]] = (
_string_to_async_iterable(prompt)
)
self._is_streaming = True
else:
self._prompt = prompt
self._is_streaming = not isinstance(prompt, str)

self._options = options
self._cli_path = (
str(options.cli_path) if options.cli_path is not None else self._find_cli()
Expand All @@ -67,6 +102,27 @@ def __init__(
self._temp_files: list[str] = [] # Track temporary files for cleanup
self._write_lock: anyio.Lock = anyio.Lock()

def _has_sdk_mcp_servers(self, options: ClaudeAgentOptions) -> bool:
"""Check if any SDK MCP servers are configured.

SDK MCP servers require bidirectional communication through stdin/stdout,
so when present, streaming mode must be forced even for string prompts.

Args:
options: The agent options to check

Returns:
True if any SDK MCP server is configured, False otherwise
"""
if not options.mcp_servers:
return False
if not isinstance(options.mcp_servers, dict):
return False
return any(
isinstance(config, dict) and config.get("type") == "sdk"
for config in options.mcp_servers.values()
)

def _find_cli(self) -> str:
"""Find Claude Code CLI binary."""
# First, check for bundled CLI
Expand Down
Loading