Skip to content

Commit 76fe69d

Browse files
1yticclaude
andcommitted
fix: Clean up sessions from manager when terminated via DELETE request
Previously, StreamableHTTPSessionManager would not remove sessions from its _server_instances dictionary when clients sent DELETE requests to terminate sessions. This caused terminated sessions to remain in memory until server shutdown, leading to potential memory accumulation in long-running servers. This change adds a callback mechanism between StreamableHTTPServerTransport and StreamableHTTPSessionManager to properly clean up terminated sessions: - Add on_session_terminated callback parameter to StreamableHTTPServerTransport - Implement _on_session_terminated method in StreamableHTTPSessionManager - Call callback from _terminate_session when DELETE request terminates session - Update HTTP status codes: return 404 NOT_FOUND for unknown session IDs - Add comprehensive test to verify session cleanup functionality Sessions are now automatically removed from the manager's memory when explicitly terminated via DELETE requests, preventing memory leaks while maintaining backward compatibility. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent e80c015 commit 76fe69d

File tree

3 files changed

+121
-4
lines changed

3 files changed

+121
-4
lines changed

src/mcp/server/streamable_http.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ def __init__(
137137
mcp_session_id: str | None,
138138
is_json_response_enabled: bool = False,
139139
event_store: EventStore | None = None,
140+
on_session_terminated: Callable[[str], Awaitable[None]] | None = None,
140141
) -> None:
141142
"""
142143
Initialize a new StreamableHTTP server transport.
@@ -149,6 +150,9 @@ def __init__(
149150
event_store: Event store for resumability support. If provided,
150151
resumability will be enabled, allowing clients to
151152
reconnect and resume messages.
153+
on_session_terminated: Optional callback to notify when session is
154+
terminated. Called with the session ID when DELETE
155+
request terminates session.
152156
153157
Raises:
154158
ValueError: If the session ID contains invalid characters.
@@ -163,6 +167,7 @@ def __init__(
163167
self.mcp_session_id = mcp_session_id
164168
self.is_json_response_enabled = is_json_response_enabled
165169
self._event_store = event_store
170+
self._on_session_terminated = on_session_terminated
166171
self._request_streams: dict[
167172
RequestId,
168173
tuple[
@@ -660,6 +665,13 @@ async def _terminate_session(self) -> None:
660665
self._terminated = True
661666
logger.info(f"Terminating session: {self.mcp_session_id}")
662667

668+
# Notify the session manager about termination
669+
if self._on_session_terminated and self.mcp_session_id:
670+
try:
671+
await self._on_session_terminated(self.mcp_session_id)
672+
except Exception as e:
673+
logger.warning(f"Error in session termination callback: {e}")
674+
663675
# We need a copy of the keys to avoid modification during iteration
664676
request_stream_keys = list(self._request_streams.keys())
665677

src/mcp/server/streamable_http_manager.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,13 @@ async def lifespan(app: Starlette) -> AsyncIterator[None]:
117117
# Clear any remaining server instances
118118
self._server_instances.clear()
119119

120+
async def _on_session_terminated(self, session_id: str) -> None:
121+
"""Callback to clean up terminated sessions from the manager."""
122+
async with self._session_creation_lock:
123+
if session_id in self._server_instances:
124+
logger.info(f"Removing terminated session from manager: {session_id}")
125+
del self._server_instances[session_id]
126+
120127
async def handle_request(
121128
self,
122129
scope: Scope,
@@ -222,6 +229,7 @@ async def _handle_stateful_request(
222229
mcp_session_id=new_session_id,
223230
is_json_response_enabled=self.json_response,
224231
event_store=self.event_store, # May be None (no resumability)
232+
on_session_terminated=self._on_session_terminated,
225233
)
226234

227235
assert http_transport.mcp_session_id is not None
@@ -250,9 +258,9 @@ async def run_server(
250258
# Handle the HTTP request and return the response
251259
await http_transport.handle_request(scope, receive, send)
252260
else:
253-
# Invalid session ID
261+
# Invalid session ID
254262
response = Response(
255-
"Bad Request: No valid session ID provided",
256-
status_code=HTTPStatus.BAD_REQUEST,
263+
"Not Found: Session has been terminated",
264+
status_code=HTTPStatus.NOT_FOUND,
257265
)
258266
await response(scope, receive, send)

tests/server/test_streamable_http_manager.py

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
"""Tests for StreamableHTTPSessionManager."""
22

3+
import json
4+
35
import anyio
46
import pytest
57

@@ -70,7 +72,7 @@ async def receive():
7072
return {"type": "http.request", "body": b""}
7173

7274
async def send(message):
73-
pass
75+
del message # Suppress unused parameter warning
7476

7577
# Should raise error because run() hasn't been called
7678
with pytest.raises(RuntimeError) as excinfo:
@@ -79,3 +81,98 @@ async def send(message):
7981
assert "Task group is not initialized. Make sure to use run()." in str(
8082
excinfo.value
8183
)
84+
85+
86+
@pytest.mark.anyio
87+
async def test_session_cleanup_on_delete_request():
88+
"""Test sessions are properly cleaned up when DELETE request terminates them."""
89+
app = Server("test-server")
90+
manager = StreamableHTTPSessionManager(app=app, json_response=True, stateless=False)
91+
92+
async with manager.run():
93+
# Create a new session by making a POST request
94+
session_id = None
95+
96+
# Mock ASGI parameters for POST request (session creation)
97+
post_scope = {
98+
"type": "http",
99+
"method": "POST",
100+
"path": "/test",
101+
"headers": [
102+
(b"content-type", b"application/json"),
103+
(b"accept", b"application/json, text/event-stream"),
104+
],
105+
}
106+
107+
# Mock initialization request
108+
init_request = {
109+
"jsonrpc": "2.0",
110+
"id": 1,
111+
"method": "initialize",
112+
"params": {
113+
"protocolVersion": "2024-11-05",
114+
"capabilities": {},
115+
"clientInfo": {"name": "test-client", "version": "1.0.0"},
116+
},
117+
}
118+
119+
post_body = json.dumps(init_request).encode()
120+
post_request_body_sent = False
121+
122+
async def post_receive():
123+
nonlocal post_request_body_sent
124+
if not post_request_body_sent:
125+
post_request_body_sent = True
126+
return {"type": "http.request", "body": post_body}
127+
else:
128+
return {"type": "http.request", "body": b""}
129+
130+
response_data = {}
131+
132+
async def post_send(message):
133+
if message["type"] == "http.response.start":
134+
response_data["status"] = message["status"]
135+
response_data["headers"] = dict(message.get("headers", []))
136+
elif message["type"] == "http.response.body":
137+
response_data["body"] = message.get("body", b"")
138+
139+
# Make POST request to create session
140+
await manager.handle_request(post_scope, post_receive, post_send)
141+
142+
# Extract session ID from response headers
143+
session_id = response_data["headers"].get(b"mcp-session-id")
144+
if session_id:
145+
session_id = session_id.decode()
146+
147+
# Verify session was created
148+
assert session_id is not None
149+
assert session_id in manager._server_instances
150+
151+
# Now make DELETE request to terminate session
152+
delete_scope = {
153+
"type": "http",
154+
"method": "DELETE",
155+
"path": "/test",
156+
"headers": [(b"mcp-session-id", session_id.encode())],
157+
}
158+
159+
async def delete_receive():
160+
return {"type": "http.request", "body": b""}
161+
162+
delete_response_data = {}
163+
164+
async def delete_send(message):
165+
if message["type"] == "http.response.start":
166+
delete_response_data["status"] = message["status"]
167+
168+
# Make DELETE request
169+
await manager.handle_request(delete_scope, delete_receive, delete_send)
170+
171+
# Verify DELETE request succeeded
172+
assert delete_response_data["status"] == 200
173+
174+
# Give a moment for the callback to execute
175+
await anyio.sleep(0.01)
176+
177+
# Verify session was removed from manager
178+
assert session_id not in manager._server_instances

0 commit comments

Comments
 (0)