From dbe7abf192974d22d292077b7d3935d647da29a9 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Tue, 22 Jul 2025 10:31:18 +0200 Subject: [PATCH 1/4] Simplify code on `stdio_client` --- src/mcp/client/stdio/__init__.py | 52 ++++++++++---------------------- src/mcp/shared/session.py | 16 +++------- 2 files changed, 20 insertions(+), 48 deletions(-) diff --git a/src/mcp/client/stdio/__init__.py b/src/mcp/client/stdio/__init__.py index 50bceddec8..b997a52920 100644 --- a/src/mcp/client/stdio/__init__.py +++ b/src/mcp/client/stdio/__init__.py @@ -1,14 +1,13 @@ import logging import os import sys -from contextlib import asynccontextmanager +from contextlib import AsyncExitStack, asynccontextmanager from pathlib import Path from typing import Literal, TextIO import anyio import anyio.lowlevel from anyio.abc import Process -from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from anyio.streams.text import TextReceiveStream from pydantic import BaseModel, Field @@ -107,33 +106,19 @@ async def stdio_client(server: StdioServerParameters, errlog: TextIO = sys.stder Client transport for stdio: this will connect to a server by spawning a process and communicating with it over stdin/stdout. """ - read_stream: MemoryObjectReceiveStream[SessionMessage | Exception] - read_stream_writer: MemoryObjectSendStream[SessionMessage | Exception] + read_stream_writer, read_stream = anyio.create_memory_object_stream[SessionMessage | Exception](0) + write_stream, write_stream_reader = anyio.create_memory_object_stream[SessionMessage](0) - write_stream: MemoryObjectSendStream[SessionMessage] - write_stream_reader: MemoryObjectReceiveStream[SessionMessage] + command = _get_executable_command(server.command) - read_stream_writer, read_stream = anyio.create_memory_object_stream(0) - write_stream, write_stream_reader = anyio.create_memory_object_stream(0) - - try: - command = _get_executable_command(server.command) - - # Open process with stderr piped for capture - process = await _create_platform_compatible_process( - command=command, - args=server.args, - env=({**get_default_environment(), **server.env} if server.env is not None else get_default_environment()), - errlog=errlog, - cwd=server.cwd, - ) - except OSError: - # Clean up streams if process creation fails - await read_stream.aclose() - await write_stream.aclose() - await read_stream_writer.aclose() - await write_stream_reader.aclose() - raise + # Open process with stderr piped for capture + process = await _create_platform_compatible_process( + command=command, + args=server.args, + env=({**get_default_environment(), **server.env} if server.env is not None else get_default_environment()), + errlog=errlog, + cwd=server.cwd, + ) async def stdout_reader(): assert process.stdout, "Opened process is missing stdout" @@ -177,14 +162,13 @@ async def stdin_writer(): except anyio.ClosedResourceError: await anyio.lowlevel.checkpoint() - async with ( - anyio.create_task_group() as tg, - process, - ): + async with anyio.create_task_group() as tg, process: tg.start_soon(stdout_reader) tg.start_soon(stdin_writer) + try: - yield read_stream, write_stream + async with read_stream, write_stream: + yield read_stream, write_stream finally: # MCP spec: stdio shutdown sequence # 1. Close input stream to server @@ -208,10 +192,6 @@ async def stdin_writer(): except ProcessLookupError: # Process already exited, which is fine pass - await read_stream.aclose() - await write_stream.aclose() - await read_stream_writer.aclose() - await write_stream_reader.aclose() def _get_executable_command(command: str) -> str: diff --git a/src/mcp/shared/session.py b/src/mcp/shared/session.py index b2f49fc8bc..330f8cdd0d 100644 --- a/src/mcp/shared/session.py +++ b/src/mcp/shared/session.py @@ -252,12 +252,7 @@ async def send_request( self._progress_callbacks[request_id] = progress_callback try: - jsonrpc_request = JSONRPCRequest( - jsonrpc="2.0", - id=request_id, - **request_data, - ) - + jsonrpc_request = JSONRPCRequest(jsonrpc="2.0", id=request_id, **request_data) await self._write_stream.send(SessionMessage(message=JSONRPCMessage(jsonrpc_request), metadata=metadata)) # request read timeout takes precedence over session read timeout @@ -329,10 +324,7 @@ async def _send_response(self, request_id: RequestId, response: SendResultT | Er await self._write_stream.send(session_message) async def _receive_loop(self) -> None: - async with ( - self._read_stream, - self._write_stream, - ): + async with self._read_stream, self._write_stream: try: async for message in self._read_stream: if isinstance(message, Exception): @@ -418,10 +410,10 @@ async def _receive_loop(self) -> None: # Without this handler, the exception would propagate up and # crash the server's task group. logging.debug("Read stream closed by client") - except Exception as e: + except Exception: # Other exceptions are not expected and should be logged. We purposefully # catch all exceptions here to avoid crashing the server. - logging.exception(f"Unhandled exception in receive loop: {e}") + logging.exception("Unhandled exception in receive loop") finally: # after the read stream is closed, we need to send errors # to any pending requests From cc1f5446c45ff79d2cb6d677577cdedaeacc46b5 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Tue, 22 Jul 2025 10:47:52 +0200 Subject: [PATCH 2/4] readd aclose --- src/mcp/client/stdio/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/mcp/client/stdio/__init__.py b/src/mcp/client/stdio/__init__.py index b997a52920..225c67c3d1 100644 --- a/src/mcp/client/stdio/__init__.py +++ b/src/mcp/client/stdio/__init__.py @@ -1,7 +1,7 @@ import logging import os import sys -from contextlib import AsyncExitStack, asynccontextmanager +from contextlib import asynccontextmanager from pathlib import Path from typing import Literal, TextIO @@ -192,6 +192,8 @@ async def stdin_writer(): except ProcessLookupError: # Process already exited, which is fine pass + await read_stream_writer.aclose() + await write_stream_reader.aclose() def _get_executable_command(command: str) -> str: From 25498aaeda2791c3d72f9e75372c34e67296d8cd Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Tue, 22 Jul 2025 10:51:43 +0200 Subject: [PATCH 3/4] readd aclose --- src/mcp/client/stdio/__init__.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/mcp/client/stdio/__init__.py b/src/mcp/client/stdio/__init__.py index 225c67c3d1..a313df13bd 100644 --- a/src/mcp/client/stdio/__init__.py +++ b/src/mcp/client/stdio/__init__.py @@ -167,8 +167,7 @@ async def stdin_writer(): tg.start_soon(stdin_writer) try: - async with read_stream, write_stream: - yield read_stream, write_stream + yield read_stream, write_stream finally: # MCP spec: stdio shutdown sequence # 1. Close input stream to server @@ -194,6 +193,8 @@ async def stdin_writer(): pass await read_stream_writer.aclose() await write_stream_reader.aclose() + await read_stream.aclose() + await write_stream.aclose() def _get_executable_command(command: str) -> str: From a179cccc440c4470e834b5243c272c6f8ebb46b1 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Tue, 22 Jul 2025 10:56:33 +0200 Subject: [PATCH 4/4] readd aclose --- src/mcp/client/stdio/__init__.py | 4 ++-- tests/client/test_stdio.py | 14 +++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/mcp/client/stdio/__init__.py b/src/mcp/client/stdio/__init__.py index a313df13bd..030bc889a0 100644 --- a/src/mcp/client/stdio/__init__.py +++ b/src/mcp/client/stdio/__init__.py @@ -191,10 +191,10 @@ async def stdin_writer(): except ProcessLookupError: # Process already exited, which is fine pass - await read_stream_writer.aclose() - await write_stream_reader.aclose() await read_stream.aclose() await write_stream.aclose() + await read_stream_writer.aclose() + await write_stream_reader.aclose() def _get_executable_command(command: str) -> str: diff --git a/tests/client/test_stdio.py b/tests/client/test_stdio.py index 2abb42e5cd..a424cbc51e 100644 --- a/tests/client/test_stdio.py +++ b/tests/client/test_stdio.py @@ -118,7 +118,7 @@ async def test_stdio_client_universal_cleanup(): """ import time import sys - + # Simulate a long-running process for i in range(100): time.sleep(0.1) @@ -532,7 +532,7 @@ async def test_stdio_client_graceful_stdin_exit(): script_content = textwrap.dedent( """ import sys - + # Read from stdin until it's closed try: while True: @@ -541,7 +541,7 @@ async def test_stdio_client_graceful_stdin_exit(): break except: pass - + # Exit gracefully sys.exit(0) """ @@ -590,16 +590,16 @@ async def test_stdio_client_stdin_close_ignored(): import signal import sys import time - + # Set up SIGTERM handler to exit cleanly def sigterm_handler(signum, frame): sys.exit(0) - + signal.signal(signal.SIGTERM, sigterm_handler) - + # Close stdin immediately to simulate ignoring it sys.stdin.close() - + # Keep running until SIGTERM while True: time.sleep(0.1)