1313from anyio .abc import TaskGroup
1414from anyio .streams .memory import MemoryObjectReceiveStream , MemoryObjectSendStream
1515from httpx_sse import EventSource , ServerSentEvent , aconnect_sse
16+ from pydantic import ValidationError
1617
1718from mcp .client ._transport import TransportStreams
1819from mcp .shared ._httpx_utils import create_mcp_http_client
1920from mcp .shared .message import ClientMessageMetadata , SessionMessage
2021from mcp .types import (
22+ INVALID_REQUEST ,
23+ PARSE_ERROR ,
2124 ErrorData ,
2225 InitializeResult ,
2326 JSONRPCError ,
@@ -163,6 +166,11 @@ async def _handle_sse_event(
163166
164167 except Exception as exc : # pragma: no cover
165168 logger .exception ("Error parsing SSE message" )
169+ if original_request_id is not None :
170+ error_data = ErrorData (code = PARSE_ERROR , message = f"Failed to parse SSE message: { exc } " )
171+ error_msg = SessionMessage (JSONRPCError (jsonrpc = "2.0" , id = original_request_id , error = error_data ))
172+ await read_stream_writer .send (error_msg )
173+ return True
166174 await read_stream_writer .send (exc )
167175 return False
168176 else : # pragma: no cover
@@ -260,7 +268,9 @@ async def _handle_post_request(self, ctx: RequestContext) -> None:
260268
261269 if response .status_code == 404 : # pragma: no branch
262270 if isinstance (message , JSONRPCRequest ): # pragma: no branch
263- await self ._send_session_terminated_error (ctx .read_stream_writer , message .id )
271+ error_data = ErrorData (code = INVALID_REQUEST , message = "Session terminated" )
272+ session_message = SessionMessage (JSONRPCError (jsonrpc = "2.0" , id = message .id , error = error_data ))
273+ await ctx .read_stream_writer .send (session_message )
264274 return
265275
266276 response .raise_for_status ()
@@ -272,20 +282,24 @@ async def _handle_post_request(self, ctx: RequestContext) -> None:
272282 if isinstance (message , JSONRPCRequest ):
273283 content_type = response .headers .get ("content-type" , "" ).lower ()
274284 if content_type .startswith ("application/json" ):
275- await self ._handle_json_response (response , ctx .read_stream_writer , is_initialization )
285+ await self ._handle_json_response (
286+ response , ctx .read_stream_writer , is_initialization , request_id = message .id
287+ )
276288 elif content_type .startswith ("text/event-stream" ):
277289 await self ._handle_sse_response (response , ctx , is_initialization )
278290 else :
279- await self . _handle_unexpected_content_type ( # pragma: no cover
280- content_type , # pragma: no cover
281- ctx . read_stream_writer , # pragma: no cover
282- ) # pragma: no cover
291+ logger . error ( f"Unexpected content type: { content_type } " )
292+ error_data = ErrorData ( code = INVALID_REQUEST , message = f"Unexpected content type: { content_type } " )
293+ error_msg = SessionMessage ( JSONRPCError ( jsonrpc = "2.0" , id = message . id , error = error_data ))
294+ await ctx . read_stream_writer . send ( error_msg )
283295
284296 async def _handle_json_response (
285297 self ,
286298 response : httpx .Response ,
287299 read_stream_writer : StreamWriter ,
288300 is_initialization : bool = False ,
301+ * ,
302+ request_id : RequestId ,
289303 ) -> None :
290304 """Handle JSON response from the server."""
291305 try :
@@ -298,9 +312,11 @@ async def _handle_json_response(
298312
299313 session_message = SessionMessage (message )
300314 await read_stream_writer .send (session_message )
301- except Exception as exc : # pragma: no cover
315+ except ( httpx . StreamError , ValidationError ) as exc :
302316 logger .exception ("Error parsing JSON response" )
303- await read_stream_writer .send (exc )
317+ error_data = ErrorData (code = PARSE_ERROR , message = f"Failed to parse JSON response: { exc } " )
318+ error_msg = SessionMessage (JSONRPCError (jsonrpc = "2.0" , id = request_id , error = error_data ))
319+ await read_stream_writer .send (error_msg )
304320
305321 async def _handle_sse_response (
306322 self ,
@@ -312,6 +328,11 @@ async def _handle_sse_response(
312328 last_event_id : str | None = None
313329 retry_interval_ms : int | None = None
314330
331+ # The caller (_handle_post_request) only reaches here inside
332+ # isinstance(message, JSONRPCRequest), so this is always a JSONRPCRequest.
333+ assert isinstance (ctx .session_message .message , JSONRPCRequest )
334+ original_request_id = ctx .session_message .message .id
335+
315336 try :
316337 event_source = EventSource (response )
317338 async for sse in event_source .aiter_sse (): # pragma: no branch
@@ -326,6 +347,7 @@ async def _handle_sse_response(
326347 is_complete = await self ._handle_sse_event (
327348 sse ,
328349 ctx .read_stream_writer ,
350+ original_request_id = original_request_id ,
329351 resumption_callback = (ctx .metadata .on_resumption_token_update if ctx .metadata else None ),
330352 is_initialization = is_initialization ,
331353 )
@@ -334,8 +356,8 @@ async def _handle_sse_response(
334356 if is_complete :
335357 await response .aclose ()
336358 return # Normal completion, no reconnect needed
337- except Exception as e :
338- logger .debug (f "SSE stream ended: { e } " ) # pragma: no cover
359+ except Exception :
360+ logger .debug ("SSE stream ended" , exc_info = True ) # pragma: no cover
339361
340362 # Stream ended without response - reconnect if we received an event with ID
341363 if last_event_id is not None : # pragma: no branch
@@ -400,24 +422,6 @@ async def _handle_reconnection(
400422 # Try to reconnect again if we still have an event ID
401423 await self ._handle_reconnection (ctx , last_event_id , retry_interval_ms , attempt + 1 )
402424
403- async def _handle_unexpected_content_type (
404- self , content_type : str , read_stream_writer : StreamWriter
405- ) -> None : # pragma: no cover
406- """Handle unexpected content type in response."""
407- error_msg = f"Unexpected content type: { content_type } " # pragma: no cover
408- logger .error (error_msg ) # pragma: no cover
409- await read_stream_writer .send (ValueError (error_msg )) # pragma: no cover
410-
411- async def _send_session_terminated_error (self , read_stream_writer : StreamWriter , request_id : RequestId ) -> None :
412- """Send a session terminated error response."""
413- jsonrpc_error = JSONRPCError (
414- jsonrpc = "2.0" ,
415- id = request_id ,
416- error = ErrorData (code = 32600 , message = "Session terminated" ),
417- )
418- session_message = SessionMessage (jsonrpc_error )
419- await read_stream_writer .send (session_message )
420-
421425 async def post_writer (
422426 self ,
423427 client : httpx .AsyncClient ,
@@ -467,8 +471,8 @@ async def handle_request_async():
467471 else :
468472 await handle_request_async ()
469473
470- except Exception :
471- logger .exception ("Error in post_writer" ) # pragma: no cover
474+ except Exception : # pragma: lax no cover
475+ logger .exception ("Error in post_writer" )
472476 finally :
473477 await read_stream_writer .aclose ()
474478 await write_stream .aclose ()
0 commit comments