@@ -603,6 +603,66 @@ def test_streamable_http_transport_init_validation():
603603 StreamableHTTPServerTransport (mcp_session_id = "test\n " )
604604
605605
606+ @pytest .mark .anyio
607+ async def test_streamable_http_mid_call_disconnect (basic_server_port : int ):
608+ """A long-running tool call should error quickly if the server dies mid-call.
609+
610+ We use the existing server implementation's `wait_for_lock_with_notification` tool which
611+ blocks waiting on an internal lock. We kill the server process after the tool starts and
612+ assert the client surfaces a CONNECTION_CLOSED McpError instead of hanging.
613+ """
614+
615+ # Launch a dedicated server process (don't reuse basic_server fixture so we can kill it)
616+ proc = multiprocessing .Process (target = run_server , kwargs = {"port" : basic_server_port }, daemon = True )
617+ proc .start ()
618+
619+ # Wait for server readiness (reuse pattern from fixtures)
620+ start_time = time .time ()
621+ while time .time () - start_time < 10 :
622+ try :
623+ with socket .create_connection (("127.0.0.1" , basic_server_port ), timeout = 0.2 ):
624+ break
625+ except OSError :
626+ time .sleep (0.1 )
627+ else : # pragma: no cover
628+ proc .kill (); proc .join (timeout = 2 )
629+ pytest .fail ("Server failed to start in time" )
630+
631+ server_url = f"http://127.0.0.1:{ basic_server_port } /mcp"
632+
633+ try :
634+ async with streamablehttp_client (server_url ) as (read_stream , write_stream , _ ):
635+ async with ClientSession (read_stream , write_stream ) as session :
636+ await session .initialize ()
637+ await session .list_tools ()
638+
639+ result : dict [str , object ] = {}
640+
641+ async def invoke ():
642+ try :
643+ await session .call_tool ("wait_for_lock_with_notification" , {}) # pragma: no cover
644+ result ["ok" ] = True
645+ except McpError as e :
646+ result ["err" ] = e
647+
648+ async with anyio .create_task_group () as tg :
649+ tg .start_soon (invoke )
650+ # Give the request a moment to reach the server & tool to start (it sends a log notification)
651+ await anyio .sleep (0.6 )
652+ proc .kill (); proc .join (timeout = 2 )
653+ # Wait for propagated disconnect
654+ with anyio .fail_after (10 ):
655+ while "err" not in result :
656+ await anyio .sleep (0.2 )
657+
658+ err = result .get ("err" )
659+ assert isinstance (err , McpError ), "Expected McpError from mid-call disconnect"
660+ assert err .error .code == types .CONNECTION_CLOSED
661+ finally :
662+ if proc .is_alive (): # Safety cleanup
663+ proc .kill (); proc .join (timeout = 2 )
664+
665+
606666def test_session_termination (basic_server : None , basic_server_url : str ):
607667 """Test session termination via DELETE and subsequent request handling."""
608668 response = requests .post (
0 commit comments