2626
2727class MockResponse :
2828 """Simple mock to track if aclose() gets called"""
29+
2930 def __init__ (self ):
3031 self .closed = False
3132 self .close_count = 0
32-
33+
3334 async def aclose (self ):
3435 self .closed = True
3536 self .close_count += 1
@@ -38,27 +39,29 @@ async def aclose(self):
3839
3940class MockEventSource :
4041 """Mock that throws an exception to simulate broken SSE"""
42+
4143 def __init__ (self , response ):
4244 self .response = response
43-
45+
4446 def __aiter__ (self ):
4547 return self
46-
48+
4749 async def __anext__ (self ):
4850 # Simulate what happens when SSE parsing fails
4951 raise Exception ("SSE parsing failed - connection broken" )
5052
5153
5254class MockTransport (StreamableHTTPTransport ):
5355 """Mock that shows the same bug as the real code"""
56+
5457 def __init__ (self ):
5558 super ().__init__ ("http://test" )
5659 self .mock_response = MockResponse ()
57-
60+
5861 async def _handle_sse_response (self , response , ctx , is_initialization = False ):
5962 """
6063 This mimics the actual bug in the real code.
61-
64+
6265 The problem: when the async for loop throws an exception,
6366 response.aclose() never gets called because it's only in the success path.
6467 """
@@ -74,7 +77,7 @@ async def _handle_sse_response(self, response, ctx, is_initialization=False):
7477 print (f"Exception caught: { e } " )
7578 # Here's the bug - response.aclose() is never called!
7679 raise
77-
80+
7881 async def _handle_resumption_request (self , ctx ):
7982 """
8083 Same issue here - the aconnect_sse context manager should handle cleanup,
@@ -85,20 +88,20 @@ async def _handle_resumption_request(self, ctx):
8588 class MockEventSourceWithResponse :
8689 def __init__ (self , response ):
8790 self .response = response
88-
91+
8992 async def __aenter__ (self ):
9093 return self
91-
94+
9295 async def __aexit__ (self , exc_type , exc_val , exc_tb ):
9396 # Context manager exits but response might not be closed
9497 pass
95-
98+
9699 def __aiter__ (self ):
97100 return self
98-
101+
99102 async def __anext__ (self ):
100103 raise Exception ("Resumption SSE parsing failed" )
101-
104+
102105 async with MockEventSourceWithResponse (self .mock_response ) as event_source :
103106 async for sse in event_source :
104107 # This never runs because the exception happens first
@@ -116,25 +119,25 @@ async def test_resource_leak():
116119 """Test the resource leak I found"""
117120 print ("Testing resource leak in streamable_http.py" )
118121 print ("=" * 50 )
119-
122+
120123 transport = MockTransport ()
121-
124+
122125 # Create mock context
123126 class MockContext :
124127 def __init__ (self ):
125128 self .read_stream_writer = None
126129 self .metadata = None
127-
130+
128131 ctx = MockContext ()
129-
132+
130133 print ("\n Testing _handle_sse_response method:" )
131134 print ("-" * 35 )
132-
135+
133136 try :
134137 await transport ._handle_sse_response (transport .mock_response , ctx )
135138 except Exception as e :
136139 print (f"Caught expected exception: { e } " )
137-
140+
138141 # Check if response was closed
139142 if transport .mock_response .closed :
140143 print ("No resource leak - response was closed properly" )
@@ -151,27 +154,27 @@ async def test_resumption_resource_leak():
151154 """Test the resource leak in _handle_resumption_request"""
152155 print ("\n Testing _handle_resumption_request method:" )
153156 print ("-" * 40 )
154-
157+
155158 transport = MockTransport ()
156-
159+
157160 # Create mock context with resumption token
158161 class MockResumptionContext :
159162 def __init__ (self ):
160163 self .read_stream_writer = None
161- self .metadata = type (' obj' , (object ,), {' resumption_token' : ' test-token' })()
162- self .session_message = type ('obj' , ( object ,), {
163- 'message' : type ( ' obj' , ( object ,), {
164- 'root' : type ( 'obj' , ( object ,), { 'id' : 'test-id' })()
165- })()
166- } )()
167-
164+ self .metadata = type (" obj" , (object ,), {" resumption_token" : " test-token" })()
165+ self .session_message = type (
166+ " obj" ,
167+ ( object ,),
168+ { "message" : type ( "obj" , ( object ,), { "root" : type ( "obj" , ( object ,), { "id" : "test-id" })()})()},
169+ )()
170+
168171 ctx_resumption = MockResumptionContext ()
169-
172+
170173 try :
171174 await transport ._handle_resumption_request (ctx_resumption )
172175 except Exception as e :
173176 print (f"Caught expected exception: { e } " )
174-
177+
175178 # Check if response was closed
176179 if transport .mock_response .closed :
177180 print ("No resource leak - response was closed properly" )
@@ -190,15 +193,15 @@ async def main():
190193 print ("This shows the issue I found where HTTP responses don't get closed" )
191194 print ("when SSE streaming fails in the MCP Python SDK." )
192195 print ()
193-
196+
194197 # Test both methods
195198 sse_leak = await test_resource_leak ()
196199 resumption_leak = await test_resumption_resource_leak ()
197-
200+
198201 print ("\n " + "=" * 50 )
199202 print ("SUMMARY:" )
200203 print ("=" * 50 )
201-
204+
202205 if sse_leak and resumption_leak :
203206 print ("All tests passed - no resource leaks detected" )
204207 return 0
0 commit comments