From 1606c96b4c9ad7d57b0c16a62f55a88de44ba046 Mon Sep 17 00:00:00 2001 From: aaronabbott Date: Wed, 4 Feb 2026 05:22:29 +0000 Subject: [PATCH 1/3] Test case showing failure at main --- .../test_streamable_http_streams_closed.py | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 tests/server/test_streamable_http_streams_closed.py diff --git a/tests/server/test_streamable_http_streams_closed.py b/tests/server/test_streamable_http_streams_closed.py new file mode 100644 index 000000000..81c4447e4 --- /dev/null +++ b/tests/server/test_streamable_http_streams_closed.py @@ -0,0 +1,34 @@ +import httpx +import pytest + +from mcp.client.session import ClientSession +from mcp.client.streamable_http import streamable_http_client +from mcp.server import MCPServer + + +@pytest.fixture +def server() -> MCPServer: + mcp = MCPServer("test_server") + + @mcp.tool() + async def my_tool() -> str: + return "test" + + return mcp + + +HOST = "testserver" + + +@pytest.mark.anyio +async def test_streamable_http_server_cleanup(server: MCPServer): + mcp_app = server.streamable_http_app(host=HOST) + async with ( + mcp_app.router.lifespan_context(mcp_app), + httpx.ASGITransport(mcp_app) as transport, + httpx.AsyncClient(transport=transport) as client, + streamable_http_client(f"http://{HOST}/mcp", http_client=client) as (read_stream, write_stream), + ClientSession(read_stream, write_stream) as session, + ): + await session.initialize() + await session.call_tool("my_tool", arguments={}) From bbcfbf6e6912f8e8d9f992845416bdc503ab4e06 Mon Sep 17 00:00:00 2001 From: aaronabbott Date: Wed, 4 Feb 2026 00:00:19 +0000 Subject: [PATCH 2/3] Fix leaked anyio streams --- src/mcp/server/streamable_http.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/mcp/server/streamable_http.py b/src/mcp/server/streamable_http.py index e9156f7ba..212f15136 100644 --- a/src/mcp/server/streamable_http.py +++ b/src/mcp/server/streamable_http.py @@ -427,7 +427,7 @@ async def _validate_accept_header(self, request: Request, scope: Scope, send: Se return False return True - async def _handle_post_request(self, scope: Scope, request: Request, receive: Receive, send: Send) -> None: + async def _handle_post_request(self, scope: Scope, request: Request, receive: Receive, send: Send) -> None: # noqa: PLR0915 """Handle POST requests containing JSON-RPC messages.""" writer = self._read_stream_writer if writer is None: # pragma: no cover @@ -620,8 +620,9 @@ async def sse_writer(): except Exception: logger.exception("SSE response error") await sse_stream_writer.aclose() - await sse_stream_reader.aclose() await self._clean_up_memory_streams(request_id) + finally: + await sse_stream_reader.aclose() except Exception as err: # pragma: no cover logger.exception("Error handling POST request") @@ -722,9 +723,10 @@ async def standalone_sse_writer(): await response(request.scope, request.receive, send) except Exception: logger.exception("Error in standalone SSE response") + await self._clean_up_memory_streams(GET_STREAM_KEY) + finally: await sse_stream_writer.aclose() await sse_stream_reader.aclose() - await self._clean_up_memory_streams(GET_STREAM_KEY) async def _handle_delete_request(self, request: Request, send: Send) -> None: # pragma: no cover """Handle DELETE requests for explicit session termination.""" From de5d6244df645de30eecc7b3024277cab982922d Mon Sep 17 00:00:00 2001 From: aaronabbott Date: Wed, 4 Feb 2026 06:07:47 +0000 Subject: [PATCH 3/3] Update "pragma: no cover" locations for improved coverage --- src/mcp/server/lowlevel/server.py | 4 +- src/mcp/server/streamable_http.py | 66 +++++++++++------------ src/mcp/server/streamable_http_manager.py | 4 +- 3 files changed, 36 insertions(+), 38 deletions(-) diff --git a/src/mcp/server/lowlevel/server.py b/src/mcp/server/lowlevel/server.py index 96dcaf1c7..7bd79bb37 100644 --- a/src/mcp/server/lowlevel/server.py +++ b/src/mcp/server/lowlevel/server.py @@ -739,9 +739,7 @@ async def _handle_request( request_data = None close_sse_stream_cb = None close_standalone_sse_stream_cb = None - if message.message_metadata is not None and isinstance( - message.message_metadata, ServerMessageMetadata - ): # pragma: no cover + if message.message_metadata is not None and isinstance(message.message_metadata, ServerMessageMetadata): request_data = message.message_metadata.request_context close_sse_stream_cb = message.message_metadata.close_sse_stream close_standalone_sse_stream_cb = message.message_metadata.close_standalone_sse_stream diff --git a/src/mcp/server/streamable_http.py b/src/mcp/server/streamable_http.py index 212f15136..dc68bc3fc 100644 --- a/src/mcp/server/streamable_http.py +++ b/src/mcp/server/streamable_http.py @@ -328,11 +328,11 @@ def _create_json_response( headers=response_headers, ) - def _get_session_id(self, request: Request) -> str | None: # pragma: no cover + def _get_session_id(self, request: Request) -> str | None: """Extract the session ID from request headers.""" return request.headers.get(MCP_SESSION_ID_HEADER) - def _create_event_data(self, event_message: EventMessage) -> dict[str, str]: # pragma: no cover + def _create_event_data(self, event_message: EventMessage) -> dict[str, str]: """Create event data dictionary from an EventMessage.""" event_data = { "event": "message", @@ -340,7 +340,7 @@ def _create_event_data(self, event_message: EventMessage) -> dict[str, str]: # } # If an event ID was provided, include it - if event_message.event_id: + if event_message.event_id: # pragma: no cover event_data["id"] = event_message.event_id return event_data @@ -381,9 +381,9 @@ async def handle_request(self, scope: Scope, receive: Receive, send: Send) -> No if request.method == "POST": await self._handle_post_request(scope, request, receive, send) - elif request.method == "GET": # pragma: no cover + elif request.method == "GET": await self._handle_get_request(request, send) - elif request.method == "DELETE": # pragma: no cover + elif request.method == "DELETE": await self._handle_delete_request(request, send) else: # pragma: no cover await self._handle_unsupported_request(request, send) @@ -470,14 +470,14 @@ async def _handle_post_request(self, scope: Scope, request: Request, receive: Re # Check if this is an initialization request is_initialization_request = isinstance(message, JSONRPCRequest) and message.method == "initialize" - if is_initialization_request: # pragma: no cover + if is_initialization_request: # Check if the server already has an established session if self.mcp_session_id: # Check if request has a session ID request_session_id = self._get_session_id(request) # If request has a session ID but doesn't match, return 404 - if request_session_id and request_session_id != self.mcp_session_id: + if request_session_id and request_session_id != self.mcp_session_id: # pragma: no cover response = self._create_error_response( "Not Found: Invalid or expired session ID", HTTPStatus.NOT_FOUND, @@ -488,7 +488,7 @@ async def _handle_post_request(self, scope: Scope, request: Request, receive: Re return # For notifications and responses only, return 202 Accepted - if not isinstance(message, JSONRPCRequest): # pragma: no cover + if not isinstance(message, JSONRPCRequest): # Create response object and send it response = self._create_json_response( None, @@ -561,14 +561,14 @@ async def _handle_post_request(self, scope: Scope, request: Request, receive: Re await response(scope, receive, send) finally: await self._clean_up_memory_streams(request_id) - else: # pragma: no cover + else: # Create SSE stream sse_stream_writer, sse_stream_reader = anyio.create_memory_object_stream[dict[str, str]](0) # Store writer reference so close_sse_stream() can close it self._sse_stream_writers[request_id] = sse_stream_writer - async def sse_writer(): + async def sse_writer(): # pragma: lax no cover # Get the request ID from the incoming request message try: async with sse_stream_writer, request_stream_reader: @@ -617,7 +617,7 @@ async def sse_writer(): # Then send the message to be processed by the server session_message = self._create_session_message(message, request, request_id, protocol_version) await writer.send(session_message) - except Exception: + except Exception: # pragma: no cover logger.exception("SSE response error") await sse_stream_writer.aclose() await self._clean_up_memory_streams(request_id) @@ -636,7 +636,7 @@ async def sse_writer(): await writer.send(Exception(err)) return - async def _handle_get_request(self, request: Request, send: Send) -> None: # pragma: no cover + async def _handle_get_request(self, request: Request, send: Send) -> None: """Handle GET request to establish SSE. This allows the server to communicate to the client without the client @@ -644,13 +644,13 @@ async def _handle_get_request(self, request: Request, send: Send) -> None: # pr and notifications on this stream. """ writer = self._read_stream_writer - if writer is None: + if writer is None: # pragma: no cover raise ValueError("No read stream writer available. Ensure connect() is called first.") # Validate Accept header - must include text/event-stream _, has_sse = self._check_accept_headers(request) - if not has_sse: + if not has_sse: # pragma: no cover response = self._create_error_response( "Not Acceptable: Client must accept text/event-stream", HTTPStatus.NOT_ACCEPTABLE, @@ -658,11 +658,11 @@ async def _handle_get_request(self, request: Request, send: Send) -> None: # pr await response(request.scope, request.receive, send) return - if not await self._validate_request_headers(request, send): + if not await self._validate_request_headers(request, send): # pragma: no cover return # Handle resumability: check for Last-Event-ID header - if last_event_id := request.headers.get(LAST_EVENT_ID_HEADER): + if last_event_id := request.headers.get(LAST_EVENT_ID_HEADER): # pragma: no cover await self._replay_events(last_event_id, request, send) return @@ -676,7 +676,7 @@ async def _handle_get_request(self, request: Request, send: Send) -> None: # pr headers[MCP_SESSION_ID_HEADER] = self.mcp_session_id # Check if we already have an active GET stream - if GET_STREAM_KEY in self._request_streams: + if GET_STREAM_KEY in self._request_streams: # pragma: no cover response = self._create_error_response( "Conflict: Only one SSE stream is allowed per session", HTTPStatus.CONFLICT, @@ -696,7 +696,7 @@ async def standalone_sse_writer(): async with sse_stream_writer, standalone_stream_reader: # Process messages from the standalone stream - async for event_message in standalone_stream_reader: + async for event_message in standalone_stream_reader: # pragma: lax no cover # For the standalone stream, we handle: # - JSONRPCNotification (server sends notifications to client) # - JSONRPCRequest (server sends requests to client) @@ -705,7 +705,7 @@ async def standalone_sse_writer(): # Send the message via SSE event_data = self._create_event_data(event_message) await sse_stream_writer.send(event_data) - except Exception: + except Exception: # pragma: no cover logger.exception("Error in standalone SSE writer") finally: logger.debug("Closing standalone SSE writer") @@ -721,17 +721,17 @@ async def standalone_sse_writer(): try: # This will send headers immediately and establish the SSE connection await response(request.scope, request.receive, send) - except Exception: + except Exception: # pragma: lax no cover logger.exception("Error in standalone SSE response") await self._clean_up_memory_streams(GET_STREAM_KEY) finally: await sse_stream_writer.aclose() await sse_stream_reader.aclose() - async def _handle_delete_request(self, request: Request, send: Send) -> None: # pragma: no cover + async def _handle_delete_request(self, request: Request, send: Send) -> None: """Handle DELETE requests for explicit session termination.""" # Validate session ID - if not self.mcp_session_id: + if not self.mcp_session_id: # pragma: no cover # If no session ID set, return Method Not Allowed response = self._create_error_response( "Method Not Allowed: Session termination not supported", @@ -740,7 +740,7 @@ async def _handle_delete_request(self, request: Request, send: Send) -> None: # await response(request.scope, request.receive, send) return - if not await self._validate_request_headers(request, send): + if not await self._validate_request_headers(request, send): # pragma: no cover return await self.terminate() @@ -798,16 +798,16 @@ async def _handle_unsupported_request(self, request: Request, send: Send) -> Non ) await response(request.scope, request.receive, send) - async def _validate_request_headers(self, request: Request, send: Send) -> bool: # pragma: no cover + async def _validate_request_headers(self, request: Request, send: Send) -> bool: # pragma: lax no cover if not await self._validate_session(request, send): return False if not await self._validate_protocol_version(request, send): return False return True - async def _validate_session(self, request: Request, send: Send) -> bool: # pragma: no cover + async def _validate_session(self, request: Request, send: Send) -> bool: """Validate the session ID in the request.""" - if not self.mcp_session_id: + if not self.mcp_session_id: # pragma: no cover # If we're not using session IDs, return True return True @@ -815,7 +815,7 @@ async def _validate_session(self, request: Request, send: Send) -> bool: # prag request_session_id = self._get_session_id(request) # If no session ID provided but required, return error - if not request_session_id: + if not request_session_id: # pragma: no cover response = self._create_error_response( "Bad Request: Missing session ID", HTTPStatus.BAD_REQUEST, @@ -824,7 +824,7 @@ async def _validate_session(self, request: Request, send: Send) -> bool: # prag return False # If session ID doesn't match, return error - if request_session_id != self.mcp_session_id: + if request_session_id != self.mcp_session_id: # pragma: no cover response = self._create_error_response( "Not Found: Invalid or expired session ID", HTTPStatus.NOT_FOUND, @@ -834,17 +834,17 @@ async def _validate_session(self, request: Request, send: Send) -> bool: # prag return True - async def _validate_protocol_version(self, request: Request, send: Send) -> bool: # pragma: no cover + async def _validate_protocol_version(self, request: Request, send: Send) -> bool: """Validate the protocol version header in the request.""" # Get the protocol version from the request headers protocol_version = request.headers.get(MCP_PROTOCOL_VERSION_HEADER) # If no protocol version provided, assume default version - if protocol_version is None: + if protocol_version is None: # pragma: no cover protocol_version = DEFAULT_NEGOTIATED_VERSION # Check if the protocol version is supported - if protocol_version not in SUPPORTED_PROTOCOL_VERSIONS: + if protocol_version not in SUPPORTED_PROTOCOL_VERSIONS: # pragma: no cover supported_versions = ", ".join(SUPPORTED_PROTOCOL_VERSIONS) response = self._create_error_response( f"Bad Request: Unsupported protocol version: {protocol_version}. " @@ -1006,10 +1006,10 @@ async def message_router(): try: # Send both the message and the event ID await self._request_streams[request_stream_id][0].send(EventMessage(message, event_id)) - except ( # pragma: no cover + except ( anyio.BrokenResourceError, anyio.ClosedResourceError, - ): + ): # pragma: no cover # Stream might be closed, remove from registry self._request_streams.pop(request_stream_id, None) else: # pragma: no cover diff --git a/src/mcp/server/streamable_http_manager.py b/src/mcp/server/streamable_http_manager.py index a954b24a4..ddc6e5014 100644 --- a/src/mcp/server/streamable_http_manager.py +++ b/src/mcp/server/streamable_http_manager.py @@ -181,7 +181,7 @@ async def _handle_stateful_request(self, scope: Scope, receive: Receive, send: S request_mcp_session_id = request.headers.get(MCP_SESSION_ID_HEADER) # Existing session case - if request_mcp_session_id is not None and request_mcp_session_id in self._server_instances: # pragma: no cover + if request_mcp_session_id is not None and request_mcp_session_id in self._server_instances: transport = self._server_instances[request_mcp_session_id] logger.debug("Session already exists, handling request directly") await transport.handle_request(scope, receive, send) @@ -261,5 +261,5 @@ class StreamableHTTPASGIApp: def __init__(self, session_manager: StreamableHTTPSessionManager): self.session_manager = session_manager - async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: # pragma: no cover + async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: await self.session_manager.handle_request(scope, receive, send)