1010
1111
1212@pytest .fixture
13- def create_streams ():
14- """Helper fixture to create memory streams for testing."""
13+ async def create_streams ():
14+ """Helper fixture to create memory streams for testing with proper cleanup."""
15+ streams_to_cleanup = []
1516
1617 def _create ():
1718 client_read_writer , client_read = anyio .create_memory_object_stream [SessionMessage | Exception ](10 )
@@ -20,14 +21,26 @@ def _create():
2021 server_read_writer , server_read = anyio .create_memory_object_stream [SessionMessage | Exception ](10 )
2122 server_write , server_write_reader = anyio .create_memory_object_stream [SessionMessage ](10 )
2223
24+ # Track all streams for cleanup
25+ streams_to_cleanup .extend (
26+ [client_read_writer , client_write_reader , server_read_writer , server_write_reader ]
27+ )
28+
2329 return (
2430 (client_read , client_write ),
2531 (server_read , server_write ),
2632 (client_read_writer , client_write_reader ),
2733 (server_read_writer , server_write_reader ),
2834 )
2935
30- return _create
36+ yield _create
37+
38+ # Clean up any unclosed streams after the test
39+ for stream in streams_to_cleanup :
40+ try :
41+ await stream .aclose ()
42+ except Exception :
43+ pass # Already closed
3144
3245
3346@pytest .mark .anyio
@@ -245,7 +258,7 @@ async def failing_transform(msg: SessionMessage) -> SessionMessage | None:
245258
246259@pytest .mark .anyio
247260async def test_proxy_cleans_up_streams (create_streams ):
248- """Test that all streams are closed when proxy exits ."""
261+ """Test that proxy exits cleanly and doesn't interfere with stream lifecycle ."""
249262 (
250263 client_streams ,
251264 server_streams ,
@@ -259,23 +272,19 @@ async def test_proxy_cleans_up_streams(create_streams):
259272 async with mcp_proxy (client_streams , server_streams ):
260273 pass # Exit immediately
261274
262- # All streams should be closed
263- # Attempting to send/receive should fail
264- with pytest .raises (anyio .ClosedResourceError ):
265- await client_read_writer .send (
266- SessionMessage (JSONRPCMessage (JSONRPCRequest (jsonrpc = "2.0" , id = "1" , method = "test" , params = {})))
267- )
268-
269- with pytest .raises ((anyio .ClosedResourceError , anyio .EndOfStream )):
270- await client_write_reader .receive ()
275+ # Streams should still be open (proxy doesn't own them)
276+ # The caller is responsible for closing streams
277+ request = JSONRPCRequest (jsonrpc = "2.0" , id = "1" , method = "test" , params = {})
278+ message = SessionMessage (JSONRPCMessage (request ))
271279
272- with pytest .raises (anyio .ClosedResourceError ):
273- await server_read_writer .send (
274- SessionMessage (JSONRPCMessage (JSONRPCRequest (jsonrpc = "2.0" , id = "2" , method = "test" , params = {})))
275- )
280+ # We can still send to the streams (they're not closed by proxy)
281+ await client_read_writer .send (message )
276282
277- with pytest .raises ((anyio .ClosedResourceError , anyio .EndOfStream )):
278- await server_write_reader .receive ()
283+ # Now manually clean up as the test would normally do
284+ await client_read_writer .aclose ()
285+ await client_write_reader .aclose ()
286+ await server_read_writer .aclose ()
287+ await server_write_reader .aclose ()
279288
280289
281290@pytest .mark .anyio
@@ -297,3 +306,4 @@ async def test_proxy_multiple_messages(create_streams):
297306 assert received .message .root .id == str (i )
298307 assert received .message .root .method == f"method_{ i } "
299308
309+
0 commit comments