Skip to content

Commit 1e4b2d3

Browse files
committed
fix: send JSONRPCError instead of bare exceptions in streamable HTTP client
When the streamable HTTP client encountered errors (unexpected content type, JSON parse failure, SSE parse failure), it sent bare Exception objects that never resolved the pending send_request() — causing the caller to hang indefinitely until timeout. Send JSONRPCError with the request's id so the response stream unblocks immediately, following the pattern already used by the 404 handler.
1 parent 1a943ad commit 1e4b2d3

File tree

3 files changed

+141
-124
lines changed

3 files changed

+141
-124
lines changed

src/mcp/client/streamable_http.py

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
from mcp.shared._httpx_utils import create_mcp_http_client
1919
from mcp.shared.message import ClientMessageMetadata, SessionMessage
2020
from mcp.types import (
21+
INVALID_REQUEST,
22+
PARSE_ERROR,
2123
ErrorData,
2224
InitializeResult,
2325
JSONRPCError,
@@ -163,6 +165,11 @@ async def _handle_sse_event(
163165

164166
except Exception as exc: # pragma: no cover
165167
logger.exception("Error parsing SSE message")
168+
if original_request_id is not None:
169+
error_data = ErrorData(code=PARSE_ERROR, message=f"Failed to parse SSE message: {exc}")
170+
error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=original_request_id, error=error_data))
171+
await read_stream_writer.send(error_msg)
172+
return True
166173
await read_stream_writer.send(exc)
167174
return False
168175
else: # pragma: no cover
@@ -260,7 +267,9 @@ async def _handle_post_request(self, ctx: RequestContext) -> None:
260267

261268
if response.status_code == 404: # pragma: no branch
262269
if isinstance(message, JSONRPCRequest): # pragma: no branch
263-
await self._send_session_terminated_error(ctx.read_stream_writer, message.id)
270+
error_data = ErrorData(code=INVALID_REQUEST, message="Session terminated")
271+
session_message = SessionMessage(JSONRPCError(jsonrpc="2.0", id=message.id, error=error_data))
272+
await ctx.read_stream_writer.send(session_message)
264273
return
265274

266275
response.raise_for_status()
@@ -272,20 +281,24 @@ async def _handle_post_request(self, ctx: RequestContext) -> None:
272281
if isinstance(message, JSONRPCRequest):
273282
content_type = response.headers.get("content-type", "").lower()
274283
if content_type.startswith("application/json"):
275-
await self._handle_json_response(response, ctx.read_stream_writer, is_initialization)
284+
await self._handle_json_response(
285+
response, ctx.read_stream_writer, is_initialization, request_id=message.id
286+
)
276287
elif content_type.startswith("text/event-stream"):
277288
await self._handle_sse_response(response, ctx, is_initialization)
278289
else:
279-
await self._handle_unexpected_content_type( # pragma: no cover
280-
content_type, # pragma: no cover
281-
ctx.read_stream_writer, # pragma: no cover
282-
) # pragma: no cover
290+
logger.error(f"Unexpected content type: {content_type}")
291+
error_data = ErrorData(code=INVALID_REQUEST, message=f"Unexpected content type: {content_type}")
292+
error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=message.id, error=error_data))
293+
await ctx.read_stream_writer.send(error_msg)
283294

284295
async def _handle_json_response(
285296
self,
286297
response: httpx.Response,
287298
read_stream_writer: StreamWriter,
288299
is_initialization: bool = False,
300+
*,
301+
request_id: RequestId,
289302
) -> None:
290303
"""Handle JSON response from the server."""
291304
try:
@@ -298,9 +311,11 @@ async def _handle_json_response(
298311

299312
session_message = SessionMessage(message)
300313
await read_stream_writer.send(session_message)
301-
except Exception as exc: # pragma: no cover
314+
except Exception as exc:
302315
logger.exception("Error parsing JSON response")
303-
await read_stream_writer.send(exc)
316+
error_data = ErrorData(code=PARSE_ERROR, message=f"Failed to parse JSON response: {exc}")
317+
error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=request_id, error=error_data))
318+
await read_stream_writer.send(error_msg)
304319

305320
async def _handle_sse_response(
306321
self,
@@ -312,6 +327,11 @@ async def _handle_sse_response(
312327
last_event_id: str | None = None
313328
retry_interval_ms: int | None = None
314329

330+
# The caller (_handle_post_request) only reaches here inside
331+
# isinstance(message, JSONRPCRequest), so this is always a JSONRPCRequest.
332+
assert isinstance(ctx.session_message.message, JSONRPCRequest)
333+
original_request_id = ctx.session_message.message.id
334+
315335
try:
316336
event_source = EventSource(response)
317337
async for sse in event_source.aiter_sse(): # pragma: no branch
@@ -326,6 +346,7 @@ async def _handle_sse_response(
326346
is_complete = await self._handle_sse_event(
327347
sse,
328348
ctx.read_stream_writer,
349+
original_request_id=original_request_id,
329350
resumption_callback=(ctx.metadata.on_resumption_token_update if ctx.metadata else None),
330351
is_initialization=is_initialization,
331352
)
@@ -334,8 +355,8 @@ async def _handle_sse_response(
334355
if is_complete:
335356
await response.aclose()
336357
return # Normal completion, no reconnect needed
337-
except Exception as e:
338-
logger.debug(f"SSE stream ended: {e}") # pragma: no cover
358+
except Exception:
359+
logger.debug("SSE stream ended", exc_info=True) # pragma: no cover
339360

340361
# Stream ended without response - reconnect if we received an event with ID
341362
if last_event_id is not None: # pragma: no branch
@@ -400,24 +421,6 @@ async def _handle_reconnection(
400421
# Try to reconnect again if we still have an event ID
401422
await self._handle_reconnection(ctx, last_event_id, retry_interval_ms, attempt + 1)
402423

403-
async def _handle_unexpected_content_type(
404-
self, content_type: str, read_stream_writer: StreamWriter
405-
) -> None: # pragma: no cover
406-
"""Handle unexpected content type in response."""
407-
error_msg = f"Unexpected content type: {content_type}" # pragma: no cover
408-
logger.error(error_msg) # pragma: no cover
409-
await read_stream_writer.send(ValueError(error_msg)) # pragma: no cover
410-
411-
async def _send_session_terminated_error(self, read_stream_writer: StreamWriter, request_id: RequestId) -> None:
412-
"""Send a session terminated error response."""
413-
jsonrpc_error = JSONRPCError(
414-
jsonrpc="2.0",
415-
id=request_id,
416-
error=ErrorData(code=32600, message="Session terminated"),
417-
)
418-
session_message = SessionMessage(jsonrpc_error)
419-
await read_stream_writer.send(session_message)
420-
421424
async def post_writer(
422425
self,
423426
client: httpx.AsyncClient,

src/mcp/types/jsonrpc.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ class JSONRPCError(BaseModel):
7575
"""A response to a request that indicates an error occurred."""
7676

7777
jsonrpc: Literal["2.0"]
78-
id: str | int
78+
id: RequestId
7979
error: ErrorData
8080

8181

tests/client/test_notification_response.py

Lines changed: 109 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -5,133 +5,147 @@
55
"""
66

77
import json
8-
import multiprocessing
9-
import socket
10-
from collections.abc import Generator
118

9+
import httpx
1210
import pytest
13-
import uvicorn
1411
from starlette.applications import Starlette
1512
from starlette.requests import Request
1613
from starlette.responses import JSONResponse, Response
1714
from starlette.routing import Route
1815

19-
from mcp import ClientSession, types
16+
from mcp import ClientSession, MCPError, types
2017
from mcp.client.streamable_http import streamable_http_client
2118
from mcp.shared.session import RequestResponder
2219
from mcp.types import RootsListChangedNotification
23-
from tests.test_helpers import wait_for_server
2420

21+
pytestmark = pytest.mark.anyio
2522

26-
def create_non_sdk_server_app() -> Starlette: # pragma: no cover
23+
INIT_RESPONSE = {
24+
"serverInfo": {"name": "test-non-sdk-server", "version": "1.0.0"},
25+
"protocolVersion": "2024-11-05",
26+
"capabilities": {},
27+
}
28+
29+
30+
def _init_json_response(data: dict[str, object]) -> JSONResponse:
31+
return JSONResponse({"jsonrpc": "2.0", "id": data["id"], "result": INIT_RESPONSE})
32+
33+
34+
def _create_non_sdk_server_app() -> Starlette:
2735
"""Create a minimal server that doesn't follow SDK conventions."""
2836

2937
async def handle_mcp_request(request: Request) -> Response:
30-
"""Handle MCP requests with non-standard responses."""
31-
try:
32-
body = await request.body()
33-
data = json.loads(body)
34-
35-
# Handle initialize request normally
36-
if data.get("method") == "initialize":
37-
response_data = {
38-
"jsonrpc": "2.0",
39-
"id": data["id"],
40-
"result": {
41-
"serverInfo": {"name": "test-non-sdk-server", "version": "1.0.0"},
42-
"protocolVersion": "2024-11-05",
43-
"capabilities": {},
44-
},
45-
}
46-
return JSONResponse(response_data)
47-
48-
# For notifications, return 204 No Content (non-SDK behavior)
49-
if "id" not in data:
50-
return Response(status_code=204, headers={"Content-Type": "application/json"})
51-
52-
# Default response for other requests
53-
return JSONResponse(
54-
{"jsonrpc": "2.0", "id": data.get("id"), "error": {"code": -32601, "message": "Method not found"}}
55-
)
56-
57-
except Exception as e:
58-
return JSONResponse({"error": f"Server error: {str(e)}"}, status_code=500)
59-
60-
app = Starlette(
61-
debug=True,
62-
routes=[
63-
Route("/mcp", handle_mcp_request, methods=["POST"]),
64-
],
65-
)
66-
return app
67-
68-
69-
def run_non_sdk_server(port: int) -> None: # pragma: no cover
70-
"""Run the non-SDK server in a separate process."""
71-
app = create_non_sdk_server_app()
72-
config = uvicorn.Config(
73-
app=app,
74-
host="127.0.0.1",
75-
port=port,
76-
log_level="error", # Reduce noise in tests
77-
)
78-
server = uvicorn.Server(config=config)
79-
server.run()
80-
81-
82-
@pytest.fixture
83-
def non_sdk_server_port() -> int:
84-
"""Get an available port for the test server."""
85-
with socket.socket() as s:
86-
s.bind(("127.0.0.1", 0))
87-
return s.getsockname()[1]
88-
89-
90-
@pytest.fixture
91-
def non_sdk_server(non_sdk_server_port: int) -> Generator[None, None, None]:
92-
"""Start a non-SDK server for testing."""
93-
proc = multiprocessing.Process(target=run_non_sdk_server, kwargs={"port": non_sdk_server_port}, daemon=True)
94-
proc.start()
95-
96-
# Wait for server to be ready
97-
try:
98-
wait_for_server(non_sdk_server_port, timeout=10.0)
99-
except TimeoutError: # pragma: no cover
100-
proc.kill()
101-
proc.join(timeout=2)
102-
pytest.fail("Server failed to start within 10 seconds")
103-
104-
yield
105-
106-
proc.kill()
107-
proc.join(timeout=2)
108-
109-
110-
@pytest.mark.anyio
111-
async def test_non_compliant_notification_response(non_sdk_server: None, non_sdk_server_port: int) -> None:
112-
"""This test verifies that the client ignores unexpected responses to notifications: the spec states they should
113-
either be 202 + no response body, or 4xx + optional error body
38+
body = await request.body()
39+
data = json.loads(body)
40+
41+
if data.get("method") == "initialize":
42+
return _init_json_response(data)
43+
44+
# For notifications, return 204 No Content (non-SDK behavior)
45+
if "id" not in data:
46+
return Response(status_code=204, headers={"Content-Type": "application/json"})
47+
48+
return JSONResponse(
49+
{"jsonrpc": "2.0", "id": data.get("id"), "error": {"code": -32601, "message": "Method not found"}}
50+
)
51+
52+
return Starlette(debug=True, routes=[Route("/mcp", handle_mcp_request, methods=["POST"])])
53+
54+
55+
def _create_unexpected_content_type_app() -> Starlette:
56+
"""Create a server that returns an unexpected content type for requests."""
57+
58+
async def handle_mcp_request(request: Request) -> Response:
59+
body = await request.body()
60+
data = json.loads(body)
61+
62+
if data.get("method") == "initialize":
63+
return _init_json_response(data)
64+
65+
if "id" not in data:
66+
return Response(status_code=202)
67+
68+
# Return text/plain for all other requests — an unexpected content type.
69+
return Response(content="this is plain text, not json or sse", status_code=200, media_type="text/plain")
70+
71+
return Starlette(debug=True, routes=[Route("/mcp", handle_mcp_request, methods=["POST"])])
72+
73+
74+
async def test_non_compliant_notification_response() -> None:
75+
"""Verify the client ignores unexpected responses to notifications.
76+
77+
The spec states notifications should get either 202 + no response body, or 4xx + optional error body
11478
(https://modelcontextprotocol.io/specification/2025-06-18/basic/transports#sending-messages-to-the-server),
11579
but some servers wrongly return other 2xx codes (e.g. 204). For now we simply ignore unexpected responses
11680
(aligning behaviour w/ the TS SDK).
11781
"""
118-
server_url = f"http://127.0.0.1:{non_sdk_server_port}/mcp"
11982
returned_exception = None
12083

12184
async def message_handler( # pragma: no cover
12285
message: RequestResponder[types.ServerRequest, types.ClientResult] | types.ServerNotification | Exception,
123-
):
86+
) -> None:
12487
nonlocal returned_exception
12588
if isinstance(message, Exception):
12689
returned_exception = message
12790

128-
async with streamable_http_client(server_url) as (read_stream, write_stream):
91+
client = httpx.AsyncClient(transport=httpx.ASGITransport(app=_create_non_sdk_server_app()))
92+
async with streamable_http_client("http://localhost/mcp", http_client=client) as (read_stream, write_stream):
12993
async with ClientSession(read_stream, write_stream, message_handler=message_handler) as session:
130-
# Initialize should work normally
13194
await session.initialize()
13295

13396
# The test server returns a 204 instead of the expected 202
13497
await session.send_notification(RootsListChangedNotification(method="notifications/roots/list_changed"))
13598

13699
if returned_exception: # pragma: no cover
137100
pytest.fail(f"Server encountered an exception: {returned_exception}")
101+
102+
103+
async def test_unexpected_content_type_sends_jsonrpc_error() -> None:
104+
"""Verify unexpected content types unblock the pending request with an MCPError.
105+
106+
When a server returns a content type that is neither application/json nor text/event-stream,
107+
the client should send a JSONRPCError so the pending request resolves immediately
108+
instead of hanging until timeout.
109+
"""
110+
client = httpx.AsyncClient(transport=httpx.ASGITransport(app=_create_unexpected_content_type_app()))
111+
async with streamable_http_client("http://localhost/mcp", http_client=client) as (read_stream, write_stream):
112+
async with ClientSession(read_stream, write_stream) as session:
113+
await session.initialize()
114+
115+
with pytest.raises(MCPError, match="Unexpected content type: text/plain"):
116+
await session.list_tools()
117+
118+
119+
def _create_invalid_json_response_app() -> Starlette:
120+
"""Create a server that returns invalid JSON for requests."""
121+
122+
async def handle_mcp_request(request: Request) -> Response:
123+
body = await request.body()
124+
data = json.loads(body)
125+
126+
if data.get("method") == "initialize":
127+
return _init_json_response(data)
128+
129+
if "id" not in data:
130+
return Response(status_code=202)
131+
132+
# Return application/json content type but with invalid JSON body.
133+
return Response(content="not valid json{{{", status_code=200, media_type="application/json")
134+
135+
return Starlette(debug=True, routes=[Route("/mcp", handle_mcp_request, methods=["POST"])])
136+
137+
138+
async def test_invalid_json_response_sends_jsonrpc_error() -> None:
139+
"""Verify invalid JSON responses unblock the pending request with an MCPError.
140+
141+
When a server returns application/json with an unparseable body, the client
142+
should send a JSONRPCError so the pending request resolves immediately
143+
instead of hanging until timeout.
144+
"""
145+
client = httpx.AsyncClient(transport=httpx.ASGITransport(app=_create_invalid_json_response_app()))
146+
async with streamable_http_client("http://localhost/mcp", http_client=client) as (read_stream, write_stream):
147+
async with ClientSession(read_stream, write_stream) as session:
148+
await session.initialize()
149+
150+
with pytest.raises(MCPError, match="Failed to parse JSON response"):
151+
await session.list_tools()

0 commit comments

Comments
 (0)