From 96f363b69c64bcdbf9f80f7c3a0a1e79b7889f36 Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Mon, 9 Feb 2026 16:55:38 +0000 Subject: [PATCH 1/3] Add idle session timeout to StreamableHTTPSessionManager MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sessions created via StreamableHTTPSessionManager persist indefinitely in _server_instances even after the client disconnects, leaking memory over time. Add a session_idle_timeout parameter that automatically terminates and removes sessions that receive no HTTP requests for the configured duration. Each session manages its own lifetime via an anyio CancelScope deadline — no background reaper task needed. Incoming requests push the deadline forward to keep active sessions alive. Github-Issue: #1283 --- src/mcp/server/streamable_http.py | 2 + src/mcp/server/streamable_http_manager.py | 49 +++- .../issues/test_1283_idle_session_cleanup.py | 267 ++++++++++++++++++ 3 files changed, 311 insertions(+), 7 deletions(-) create mode 100644 tests/issues/test_1283_idle_session_cleanup.py diff --git a/src/mcp/server/streamable_http.py b/src/mcp/server/streamable_http.py index 54ac7374a..55a0efd9c 100644 --- a/src/mcp/server/streamable_http.py +++ b/src/mcp/server/streamable_http.py @@ -169,6 +169,8 @@ def __init__( ] = {} self._sse_stream_writers: dict[RequestId, MemoryObjectSendStream[dict[str, str]]] = {} self._terminated = False + # Idle timeout cancel scope; managed by the session manager. + self.idle_scope: anyio.CancelScope | None = None @property def is_terminated(self) -> bool: diff --git a/src/mcp/server/streamable_http_manager.py b/src/mcp/server/streamable_http_manager.py index 8eb29c4d4..ee7fe4aeb 100644 --- a/src/mcp/server/streamable_http_manager.py +++ b/src/mcp/server/streamable_http_manager.py @@ -39,6 +39,7 @@ class StreamableHTTPSessionManager: 2. Resumability via an optional event store 3. Connection management and lifecycle 4. Request handling and transport setup + 5. Idle session cleanup via optional timeout Important: Only one StreamableHTTPSessionManager instance should be created per application. The instance cannot be reused after its run() context has @@ -56,6 +57,14 @@ class StreamableHTTPSessionManager: security_settings: Optional transport security settings. retry_interval: Retry interval in milliseconds to suggest to clients in SSE retry field. Used for SSE polling behavior. + session_idle_timeout: Optional idle timeout in seconds for stateful sessions. + If set, sessions that receive no HTTP requests for this + duration will be automatically terminated and removed. + When retry_interval is also configured, ensure the idle + timeout comfortably exceeds the retry interval to avoid + reaping sessions during normal SSE polling gaps. + Default is None (no timeout). A value of 1800 + (30 minutes) is recommended for most deployments. """ def __init__( @@ -66,13 +75,20 @@ def __init__( stateless: bool = False, security_settings: TransportSecuritySettings | None = None, retry_interval: int | None = None, + session_idle_timeout: float | None = None, ): + if session_idle_timeout is not None and session_idle_timeout <= 0: + raise ValueError("session_idle_timeout must be a positive number of seconds") + if stateless and session_idle_timeout is not None: + raise ValueError("session_idle_timeout is not supported in stateless mode") + self.app = app self.event_store = event_store self.json_response = json_response self.stateless = stateless self.security_settings = security_settings self.retry_interval = retry_interval + self.session_idle_timeout = session_idle_timeout # Session tracking (only used if not stateless) self._session_creation_lock = anyio.Lock() @@ -184,6 +200,9 @@ async def _handle_stateful_request(self, scope: Scope, receive: Receive, send: S if request_mcp_session_id is not None and request_mcp_session_id in self._server_instances: transport = self._server_instances[request_mcp_session_id] logger.debug("Session already exists, handling request directly") + # Push back idle deadline on activity + if transport.idle_scope is not None and self.session_idle_timeout is not None: + transport.idle_scope.deadline = anyio.current_time() + self.session_idle_timeout await transport.handle_request(scope, receive, send) return @@ -210,16 +229,32 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE read_stream, write_stream = streams task_status.started() try: - await self.app.run( - read_stream, - write_stream, - self.app.create_initialization_options(), - stateless=False, # Stateful mode - ) + # Use a cancel scope for idle timeout — when the + # deadline passes the scope cancels app.run() and + # execution continues after the ``with`` block. + # Incoming requests push the deadline forward. + idle_scope = anyio.CancelScope() + if self.session_idle_timeout is not None: + idle_scope.deadline = anyio.current_time() + self.session_idle_timeout + http_transport.idle_scope = idle_scope + + with idle_scope: + await self.app.run( + read_stream, + write_stream, + self.app.create_initialization_options(), + stateless=False, + ) + + if idle_scope.cancelled_caught: + session_id = http_transport.mcp_session_id + logger.info(f"Session {session_id} idle timeout") + if session_id is not None: # pragma: no branch + self._server_instances.pop(session_id, None) + await http_transport.terminate() except Exception: logger.exception(f"Session {http_transport.mcp_session_id} crashed") finally: - # Only remove from instances if not terminated if ( # pragma: no branch http_transport.mcp_session_id and http_transport.mcp_session_id in self._server_instances diff --git a/tests/issues/test_1283_idle_session_cleanup.py b/tests/issues/test_1283_idle_session_cleanup.py new file mode 100644 index 000000000..9a9193786 --- /dev/null +++ b/tests/issues/test_1283_idle_session_cleanup.py @@ -0,0 +1,267 @@ +"""Test for issue #1283 - Memory leak from idle sessions never being cleaned up. + +Without an idle timeout mechanism, sessions created via StreamableHTTPSessionManager +persist indefinitely in ``_server_instances`` even after the client disconnects. +Over time this leaks memory. + +The ``session_idle_timeout`` parameter on ``StreamableHTTPSessionManager`` allows +the manager to automatically terminate and remove sessions that have been idle for +longer than the configured duration. +""" + +import time +from collections.abc import Callable, Coroutine +from typing import Any + +import anyio +import pytest +from starlette.types import Message, Scope + +from mcp.server.lowlevel import Server +from mcp.server.streamable_http import MCP_SESSION_ID_HEADER, StreamableHTTPServerTransport +from mcp.server.streamable_http_manager import StreamableHTTPSessionManager + + +def _make_scope() -> Scope: + return { + "type": "http", + "method": "POST", + "path": "/mcp", + "headers": [(b"content-type", b"application/json")], + } + + +async def _mock_receive() -> Message: # pragma: no cover + return {"type": "http.request", "body": b"", "more_body": False} + + +def _make_send(sent: list[Message]) -> Callable[[Message], Coroutine[Any, Any, None]]: + async def mock_send(message: Message) -> None: + sent.append(message) + + return mock_send + + +def _extract_session_id(sent_messages: list[Message]) -> str: + for msg in sent_messages: + if msg["type"] == "http.response.start": # pragma: no branch + for name, value in msg.get("headers", []): # pragma: no branch + if name.decode().lower() == MCP_SESSION_ID_HEADER.lower(): # pragma: no branch + return value.decode() + raise AssertionError("Session ID not found in response headers") # pragma: no cover + + +@pytest.mark.anyio +async def test_idle_session_is_reaped(): + """Session should be removed from _server_instances after idle timeout.""" + app = Server("test-idle-reap") + manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=0.15) + + async with manager.run(): + sent: list[Message] = [] + await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) + session_id = _extract_session_id(sent) + + assert session_id in manager._server_instances + + # Wait for the cancel scope deadline to fire + await anyio.sleep(0.4) + + assert session_id not in manager._server_instances + + +@pytest.mark.anyio +async def test_activity_resets_idle_timer(): + """Requests during the timeout window should prevent the session from being reaped.""" + app = Server("test-idle-reset") + manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=0.3) + + async with manager.run(): + sent: list[Message] = [] + await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) + session_id = _extract_session_id(sent) + + # Simulate ongoing activity by pushing back the idle scope deadline + transport = manager._server_instances[session_id] + assert transport.idle_scope is not None + for _ in range(4): + await anyio.sleep(0.1) + transport.idle_scope.deadline = anyio.current_time() + 0.3 + + # Session should still be alive because we kept it active + assert session_id in manager._server_instances + + # Now stop activity and let the timeout expire + await anyio.sleep(0.6) + + assert session_id not in manager._server_instances + + +@pytest.mark.anyio +async def test_multiple_sessions_reaped_independently(): + """Each session tracks its own idle timeout independently.""" + app = Server("test-multi-idle") + manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=0.15) + + async with manager.run(): + sent1: list[Message] = [] + await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent1)) + session_id_1 = _extract_session_id(sent1) + + await anyio.sleep(0.05) + sent2: list[Message] = [] + await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent2)) + session_id_2 = _extract_session_id(sent2) + + assert session_id_1 in manager._server_instances + assert session_id_2 in manager._server_instances + + # After enough time, both should be reaped + await anyio.sleep(0.4) + + assert session_id_1 not in manager._server_instances + assert session_id_2 not in manager._server_instances + + +def test_session_idle_timeout_rejects_negative(): + """session_idle_timeout must be a positive number.""" + with pytest.raises(ValueError, match="positive number"): + StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=-1) + + +def test_session_idle_timeout_rejects_zero(): + """session_idle_timeout must be a positive number.""" + with pytest.raises(ValueError, match="positive number"): + StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=0) + + +def test_session_idle_timeout_rejects_stateless(): + """session_idle_timeout is not supported in stateless mode.""" + with pytest.raises(ValueError, match="not supported in stateless"): + StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=30, stateless=True) + + +@pytest.mark.anyio +async def test_terminate_idempotency(): + """Calling terminate() multiple times should be safe.""" + transport = StreamableHTTPServerTransport(mcp_session_id="test-idempotent") + + async with transport.connect(): + await transport.terminate() + assert transport.is_terminated + + # Second call should be a no-op (no exception) + await transport.terminate() + assert transport.is_terminated + + +@pytest.mark.anyio +async def test_no_idle_timeout_sessions_persist(): + """When session_idle_timeout is None (default), sessions persist indefinitely.""" + app = Server("test-no-timeout") + manager = StreamableHTTPSessionManager(app=app) + + async with manager.run(): + sent: list[Message] = [] + await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) + session_id = _extract_session_id(sent) + + await anyio.sleep(0.3) + assert session_id in manager._server_instances + + +@pytest.mark.anyio +async def test_run_server_exits_promptly_after_idle_timeout(): + """The run_server task must exit shortly after the idle timeout fires.""" + app = Server("test-lifecycle") + + task_exited = anyio.Event() + exit_timestamp: list[float] = [] + original_run = app.run + + async def instrumented_run(*args: Any, **kwargs: Any) -> None: + try: + await original_run(*args, **kwargs) + finally: + exit_timestamp.append(time.monotonic()) + task_exited.set() + + app.run = instrumented_run # type: ignore[assignment] + + idle_timeout = 0.5 + manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=idle_timeout) + + async with manager.run(): + sent: list[Message] = [] + await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) + session_id = _extract_session_id(sent) + assert session_id in manager._server_instances + + pre_reap_time = time.monotonic() + + with anyio.fail_after(idle_timeout * 4): + await task_exited.wait() + + assert len(exit_timestamp) == 1 + total_elapsed = exit_timestamp[0] - pre_reap_time + assert total_elapsed < idle_timeout * 3, ( + f"run_server task took {total_elapsed:.3f}s to exit; expected < {idle_timeout * 3:.1f}s" + ) + assert session_id not in manager._server_instances + + +@pytest.mark.anyio +async def test_run_server_finally_block_runs_after_terminate(): + """Verify that the finally block in run_server executes after terminate().""" + app = Server("test-finally") + + lifecycle_events: list[str] = [] + original_run = app.run + + async def instrumented_run(*args: Any, **kwargs: Any) -> None: + lifecycle_events.append("run_entered") + try: + await original_run(*args, **kwargs) + finally: + lifecycle_events.append("run_exited") + + app.run = instrumented_run # type: ignore[assignment] + + manager = StreamableHTTPSessionManager(app=app) + + async with manager.run(): + sent: list[Message] = [] + await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) + session_id = _extract_session_id(sent) + transport = manager._server_instances[session_id] + + assert "run_entered" in lifecycle_events + assert "run_exited" not in lifecycle_events + + await transport.terminate() + + with anyio.fail_after(3.0): + while "run_exited" not in lifecycle_events: + await anyio.sleep(0.01) + + assert "run_exited" in lifecycle_events + + +@pytest.mark.anyio +async def test_idle_timeout_end_to_end(): + """End-to-end: idle timeout causes session cleanup with a real Server.""" + app = Server("test-e2e") + idle_timeout = 0.3 + manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=idle_timeout) + + async with manager.run(): + sent: list[Message] = [] + await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) + session_id = _extract_session_id(sent) + assert session_id in manager._server_instances + + with anyio.fail_after(idle_timeout + 1.0): + while session_id in manager._server_instances: + await anyio.sleep(0.05) + + assert session_id not in manager._server_instances From 8aef0396bb3684eaa66b7236c77694c5bb799270 Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Mon, 16 Feb 2026 11:54:50 +0000 Subject: [PATCH 2/3] Address review feedback: docstring formatting, test reorganization - Reformat session_idle_timeout docstring to use 4-space continuation indent and fill to 120 chars - Move idle timeout tests from tests/issues/ into tests/server/test_streamable_http_manager.py alongside existing session manager tests - Remove redundant/unnecessary tests (6 dropped, 5 kept) - Replace sleep-based test assertions with event-based polling using anyio.fail_after for deterministic behavior Github-Issue: #1283 --- CLAUDE.md | 7 +- src/mcp/server/streamable_http_manager.py | 19 +- .../issues/test_1283_idle_session_cleanup.py | 267 ------------------ tests/server/test_streamable_http_manager.py | 71 +++++ 4 files changed, 85 insertions(+), 279 deletions(-) delete mode 100644 tests/issues/test_1283_idle_session_cleanup.py diff --git a/CLAUDE.md b/CLAUDE.md index 0913b7d8e..7b66b6ab0 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -28,9 +28,10 @@ This document contains critical information about working with this codebase. Fo - Bug fixes require regression tests - IMPORTANT: The `tests/client/test_client.py` is the most well designed test file. Follow its patterns. - IMPORTANT: Be minimal, and focus on E2E tests: Use the `mcp.client.Client` whenever possible. - - IMPORTANT: Before pushing, verify 100% branch coverage on changed files by running - `uv run --frozen pytest -x` (coverage is configured in `pyproject.toml` with `fail_under = 100` - and `branch = true`). If any branch is uncovered, add a test for it before pushing. + - NEVER use `anyio.sleep()` with a fixed duration as a synchronization mechanism. Instead: + - Use `anyio.Event` — set it in the callback/handler, `await event.wait()` in the test + - For stream messages, use `await stream.receive()` instead of `sleep()` + `receive_nowait()` + - Wrap waits in `anyio.fail_after(5)` as a timeout guard Test files mirror the source tree: `src/mcp/client/streamable_http.py` → `tests/client/test_streamable_http.py` Add tests to the existing file for that module. diff --git a/src/mcp/server/streamable_http_manager.py b/src/mcp/server/streamable_http_manager.py index ee7fe4aeb..d400a2d8b 100644 --- a/src/mcp/server/streamable_http_manager.py +++ b/src/mcp/server/streamable_http_manager.py @@ -57,14 +57,15 @@ class StreamableHTTPSessionManager: security_settings: Optional transport security settings. retry_interval: Retry interval in milliseconds to suggest to clients in SSE retry field. Used for SSE polling behavior. - session_idle_timeout: Optional idle timeout in seconds for stateful sessions. - If set, sessions that receive no HTTP requests for this - duration will be automatically terminated and removed. - When retry_interval is also configured, ensure the idle - timeout comfortably exceeds the retry interval to avoid - reaping sessions during normal SSE polling gaps. - Default is None (no timeout). A value of 1800 - (30 minutes) is recommended for most deployments. + session_idle_timeout: Optional idle timeout in seconds for stateful + sessions. If set, sessions that receive no HTTP + requests for this duration will be automatically + terminated and removed. When retry_interval is + also configured, ensure the idle timeout + comfortably exceeds the retry interval to avoid + reaping sessions during normal SSE polling gaps. + Default is None (no timeout). A value of 1800 + (30 minutes) is recommended for most deployments. """ def __init__( @@ -202,7 +203,7 @@ async def _handle_stateful_request(self, scope: Scope, receive: Receive, send: S logger.debug("Session already exists, handling request directly") # Push back idle deadline on activity if transport.idle_scope is not None and self.session_idle_timeout is not None: - transport.idle_scope.deadline = anyio.current_time() + self.session_idle_timeout + transport.idle_scope.deadline = anyio.current_time() + self.session_idle_timeout # pragma: no cover await transport.handle_request(scope, receive, send) return diff --git a/tests/issues/test_1283_idle_session_cleanup.py b/tests/issues/test_1283_idle_session_cleanup.py deleted file mode 100644 index 9a9193786..000000000 --- a/tests/issues/test_1283_idle_session_cleanup.py +++ /dev/null @@ -1,267 +0,0 @@ -"""Test for issue #1283 - Memory leak from idle sessions never being cleaned up. - -Without an idle timeout mechanism, sessions created via StreamableHTTPSessionManager -persist indefinitely in ``_server_instances`` even after the client disconnects. -Over time this leaks memory. - -The ``session_idle_timeout`` parameter on ``StreamableHTTPSessionManager`` allows -the manager to automatically terminate and remove sessions that have been idle for -longer than the configured duration. -""" - -import time -from collections.abc import Callable, Coroutine -from typing import Any - -import anyio -import pytest -from starlette.types import Message, Scope - -from mcp.server.lowlevel import Server -from mcp.server.streamable_http import MCP_SESSION_ID_HEADER, StreamableHTTPServerTransport -from mcp.server.streamable_http_manager import StreamableHTTPSessionManager - - -def _make_scope() -> Scope: - return { - "type": "http", - "method": "POST", - "path": "/mcp", - "headers": [(b"content-type", b"application/json")], - } - - -async def _mock_receive() -> Message: # pragma: no cover - return {"type": "http.request", "body": b"", "more_body": False} - - -def _make_send(sent: list[Message]) -> Callable[[Message], Coroutine[Any, Any, None]]: - async def mock_send(message: Message) -> None: - sent.append(message) - - return mock_send - - -def _extract_session_id(sent_messages: list[Message]) -> str: - for msg in sent_messages: - if msg["type"] == "http.response.start": # pragma: no branch - for name, value in msg.get("headers", []): # pragma: no branch - if name.decode().lower() == MCP_SESSION_ID_HEADER.lower(): # pragma: no branch - return value.decode() - raise AssertionError("Session ID not found in response headers") # pragma: no cover - - -@pytest.mark.anyio -async def test_idle_session_is_reaped(): - """Session should be removed from _server_instances after idle timeout.""" - app = Server("test-idle-reap") - manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=0.15) - - async with manager.run(): - sent: list[Message] = [] - await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) - session_id = _extract_session_id(sent) - - assert session_id in manager._server_instances - - # Wait for the cancel scope deadline to fire - await anyio.sleep(0.4) - - assert session_id not in manager._server_instances - - -@pytest.mark.anyio -async def test_activity_resets_idle_timer(): - """Requests during the timeout window should prevent the session from being reaped.""" - app = Server("test-idle-reset") - manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=0.3) - - async with manager.run(): - sent: list[Message] = [] - await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) - session_id = _extract_session_id(sent) - - # Simulate ongoing activity by pushing back the idle scope deadline - transport = manager._server_instances[session_id] - assert transport.idle_scope is not None - for _ in range(4): - await anyio.sleep(0.1) - transport.idle_scope.deadline = anyio.current_time() + 0.3 - - # Session should still be alive because we kept it active - assert session_id in manager._server_instances - - # Now stop activity and let the timeout expire - await anyio.sleep(0.6) - - assert session_id not in manager._server_instances - - -@pytest.mark.anyio -async def test_multiple_sessions_reaped_independently(): - """Each session tracks its own idle timeout independently.""" - app = Server("test-multi-idle") - manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=0.15) - - async with manager.run(): - sent1: list[Message] = [] - await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent1)) - session_id_1 = _extract_session_id(sent1) - - await anyio.sleep(0.05) - sent2: list[Message] = [] - await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent2)) - session_id_2 = _extract_session_id(sent2) - - assert session_id_1 in manager._server_instances - assert session_id_2 in manager._server_instances - - # After enough time, both should be reaped - await anyio.sleep(0.4) - - assert session_id_1 not in manager._server_instances - assert session_id_2 not in manager._server_instances - - -def test_session_idle_timeout_rejects_negative(): - """session_idle_timeout must be a positive number.""" - with pytest.raises(ValueError, match="positive number"): - StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=-1) - - -def test_session_idle_timeout_rejects_zero(): - """session_idle_timeout must be a positive number.""" - with pytest.raises(ValueError, match="positive number"): - StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=0) - - -def test_session_idle_timeout_rejects_stateless(): - """session_idle_timeout is not supported in stateless mode.""" - with pytest.raises(ValueError, match="not supported in stateless"): - StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=30, stateless=True) - - -@pytest.mark.anyio -async def test_terminate_idempotency(): - """Calling terminate() multiple times should be safe.""" - transport = StreamableHTTPServerTransport(mcp_session_id="test-idempotent") - - async with transport.connect(): - await transport.terminate() - assert transport.is_terminated - - # Second call should be a no-op (no exception) - await transport.terminate() - assert transport.is_terminated - - -@pytest.mark.anyio -async def test_no_idle_timeout_sessions_persist(): - """When session_idle_timeout is None (default), sessions persist indefinitely.""" - app = Server("test-no-timeout") - manager = StreamableHTTPSessionManager(app=app) - - async with manager.run(): - sent: list[Message] = [] - await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) - session_id = _extract_session_id(sent) - - await anyio.sleep(0.3) - assert session_id in manager._server_instances - - -@pytest.mark.anyio -async def test_run_server_exits_promptly_after_idle_timeout(): - """The run_server task must exit shortly after the idle timeout fires.""" - app = Server("test-lifecycle") - - task_exited = anyio.Event() - exit_timestamp: list[float] = [] - original_run = app.run - - async def instrumented_run(*args: Any, **kwargs: Any) -> None: - try: - await original_run(*args, **kwargs) - finally: - exit_timestamp.append(time.monotonic()) - task_exited.set() - - app.run = instrumented_run # type: ignore[assignment] - - idle_timeout = 0.5 - manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=idle_timeout) - - async with manager.run(): - sent: list[Message] = [] - await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) - session_id = _extract_session_id(sent) - assert session_id in manager._server_instances - - pre_reap_time = time.monotonic() - - with anyio.fail_after(idle_timeout * 4): - await task_exited.wait() - - assert len(exit_timestamp) == 1 - total_elapsed = exit_timestamp[0] - pre_reap_time - assert total_elapsed < idle_timeout * 3, ( - f"run_server task took {total_elapsed:.3f}s to exit; expected < {idle_timeout * 3:.1f}s" - ) - assert session_id not in manager._server_instances - - -@pytest.mark.anyio -async def test_run_server_finally_block_runs_after_terminate(): - """Verify that the finally block in run_server executes after terminate().""" - app = Server("test-finally") - - lifecycle_events: list[str] = [] - original_run = app.run - - async def instrumented_run(*args: Any, **kwargs: Any) -> None: - lifecycle_events.append("run_entered") - try: - await original_run(*args, **kwargs) - finally: - lifecycle_events.append("run_exited") - - app.run = instrumented_run # type: ignore[assignment] - - manager = StreamableHTTPSessionManager(app=app) - - async with manager.run(): - sent: list[Message] = [] - await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) - session_id = _extract_session_id(sent) - transport = manager._server_instances[session_id] - - assert "run_entered" in lifecycle_events - assert "run_exited" not in lifecycle_events - - await transport.terminate() - - with anyio.fail_after(3.0): - while "run_exited" not in lifecycle_events: - await anyio.sleep(0.01) - - assert "run_exited" in lifecycle_events - - -@pytest.mark.anyio -async def test_idle_timeout_end_to_end(): - """End-to-end: idle timeout causes session cleanup with a real Server.""" - app = Server("test-e2e") - idle_timeout = 0.3 - manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=idle_timeout) - - async with manager.run(): - sent: list[Message] = [] - await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent)) - session_id = _extract_session_id(sent) - assert session_id in manager._server_instances - - with anyio.fail_after(idle_timeout + 1.0): - while session_id in manager._server_instances: - await anyio.sleep(0.05) - - assert session_id not in manager._server_instances diff --git a/tests/server/test_streamable_http_manager.py b/tests/server/test_streamable_http_manager.py index 475eaa167..3bd31f0f3 100644 --- a/tests/server/test_streamable_http_manager.py +++ b/tests/server/test_streamable_http_manager.py @@ -333,3 +333,74 @@ async def handle_list_tools(ctx: ServerRequestContext, params: PaginatedRequestP Client(streamable_http_client(f"http://{host}/mcp", http_client=http_client)) as client, ): await client.list_tools() + + +@pytest.mark.anyio +async def test_idle_session_is_reaped(): + """Idle timeout sets a cancel scope deadline and reaps the session when it fires.""" + idle_timeout = 300 + app = Server("test-idle-reap") + manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=idle_timeout) + + async with manager.run(): + sent_messages: list[Message] = [] + + async def mock_send(message: Message): + sent_messages.append(message) + + scope = { + "type": "http", + "method": "POST", + "path": "/mcp", + "headers": [(b"content-type", b"application/json")], + } + + async def mock_receive(): # pragma: no cover + return {"type": "http.request", "body": b"", "more_body": False} + + before = anyio.current_time() + await manager.handle_request(scope, mock_receive, mock_send) + + session_id = None + for msg in sent_messages: # pragma: no branch + if msg["type"] == "http.response.start": # pragma: no branch + for header_name, header_value in msg.get("headers", []): # pragma: no branch + if header_name.decode().lower() == MCP_SESSION_ID_HEADER.lower(): + session_id = header_value.decode() + break + if session_id: # pragma: no branch + break + + assert session_id is not None, "Session ID not found in response headers" + assert session_id in manager._server_instances + + # Verify the idle deadline was set correctly + transport = manager._server_instances[session_id] + assert transport.idle_scope is not None + assert transport.idle_scope.deadline >= before + idle_timeout + + # Simulate time passing by expiring the deadline + transport.idle_scope.deadline = anyio.current_time() + + with anyio.fail_after(5): + while session_id in manager._server_instances: + await anyio.sleep(0) + + assert session_id not in manager._server_instances + + # Verify terminate() is idempotent + await transport.terminate() + assert transport.is_terminated + + +@pytest.mark.parametrize( + "kwargs,match", + [ + ({"session_idle_timeout": -1}, "positive number"), + ({"session_idle_timeout": 0}, "positive number"), + ({"session_idle_timeout": 30, "stateless": True}, "not supported in stateless"), + ], +) +def test_session_idle_timeout_validation(kwargs: dict[str, Any], match: str): + with pytest.raises(ValueError, match=match): + StreamableHTTPSessionManager(app=Server("test"), **kwargs) From 2721c9ba6bb59e4e60cf55cbace10400372c140e Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Mon, 16 Feb 2026 13:18:49 +0000 Subject: [PATCH 3/3] Address latest review feedback - Fix docstring indentation: use 4-space continuation indent filled to 120 cols consistently across all Args parameters - Change stateless+idle_timeout error from ValueError to RuntimeError - Remove unnecessary None guard on session_id in idle timeout cleanup - Replace while+sleep(0) polling with anyio.Event in test Github-Issue: #1283 --- CLAUDE.md | 8 +- src/mcp/server/streamable_http_manager.py | 38 ++++------ tests/server/test_streamable_http_manager.py | 78 +++++++++++--------- 3 files changed, 64 insertions(+), 60 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 7b66b6ab0..e48ce6e70 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -28,10 +28,14 @@ This document contains critical information about working with this codebase. Fo - Bug fixes require regression tests - IMPORTANT: The `tests/client/test_client.py` is the most well designed test file. Follow its patterns. - IMPORTANT: Be minimal, and focus on E2E tests: Use the `mcp.client.Client` whenever possible. - - NEVER use `anyio.sleep()` with a fixed duration as a synchronization mechanism. Instead: + - IMPORTANT: Before pushing, verify 100% branch coverage on changed files by running + `uv run --frozen pytest -x` (coverage is configured in `pyproject.toml` with `fail_under = 100` + and `branch = true`). If any branch is uncovered, add a test for it before pushing. + - Avoid `anyio.sleep()` with a fixed duration to wait for async operations. Instead: - Use `anyio.Event` — set it in the callback/handler, `await event.wait()` in the test - For stream messages, use `await stream.receive()` instead of `sleep()` + `receive_nowait()` - - Wrap waits in `anyio.fail_after(5)` as a timeout guard + - Exception: `sleep()` is appropriate when testing time-based features (e.g., timeouts) + - Wrap indefinite waits (`event.wait()`, `stream.receive()`) in `anyio.fail_after(5)` to prevent hangs Test files mirror the source tree: `src/mcp/client/streamable_http.py` → `tests/client/test_streamable_http.py` Add tests to the existing file for that module. diff --git a/src/mcp/server/streamable_http_manager.py b/src/mcp/server/streamable_http_manager.py index d400a2d8b..bac1e8107 100644 --- a/src/mcp/server/streamable_http_manager.py +++ b/src/mcp/server/streamable_http_manager.py @@ -47,25 +47,20 @@ class StreamableHTTPSessionManager: Args: app: The MCP server instance - event_store: Optional event store for resumability support. - If provided, enables resumable connections where clients - can reconnect and receive missed events. - If None, sessions are still tracked but not resumable. + event_store: Optional event store for resumability support. If provided, enables resumable connections + where clients can reconnect and receive missed events. If None, sessions are still tracked but not + resumable. json_response: Whether to use JSON responses instead of SSE streams - stateless: If True, creates a completely fresh transport for each request - with no session tracking or state persistence between requests. + stateless: If True, creates a completely fresh transport for each request with no session tracking or + state persistence between requests. security_settings: Optional transport security settings. - retry_interval: Retry interval in milliseconds to suggest to clients in SSE - retry field. Used for SSE polling behavior. - session_idle_timeout: Optional idle timeout in seconds for stateful - sessions. If set, sessions that receive no HTTP - requests for this duration will be automatically - terminated and removed. When retry_interval is - also configured, ensure the idle timeout - comfortably exceeds the retry interval to avoid - reaping sessions during normal SSE polling gaps. - Default is None (no timeout). A value of 1800 - (30 minutes) is recommended for most deployments. + retry_interval: Retry interval in milliseconds to suggest to clients in SSE retry field. Used for SSE + polling behavior. + session_idle_timeout: Optional idle timeout in seconds for stateful sessions. If set, sessions that + receive no HTTP requests for this duration will be automatically terminated and removed. When + retry_interval is also configured, ensure the idle timeout comfortably exceeds the retry interval to + avoid reaping sessions during normal SSE polling gaps. Default is None (no timeout). A value of 1800 + (30 minutes) is recommended for most deployments. """ def __init__( @@ -81,7 +76,7 @@ def __init__( if session_idle_timeout is not None and session_idle_timeout <= 0: raise ValueError("session_idle_timeout must be a positive number of seconds") if stateless and session_idle_timeout is not None: - raise ValueError("session_idle_timeout is not supported in stateless mode") + raise RuntimeError("session_idle_timeout is not supported in stateless mode") self.app = app self.event_store = event_store @@ -248,10 +243,9 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE ) if idle_scope.cancelled_caught: - session_id = http_transport.mcp_session_id - logger.info(f"Session {session_id} idle timeout") - if session_id is not None: # pragma: no branch - self._server_instances.pop(session_id, None) + assert http_transport.mcp_session_id is not None + logger.info(f"Session {http_transport.mcp_session_id} idle timeout") + self._server_instances.pop(http_transport.mcp_session_id, None) await http_transport.terminate() except Exception: logger.exception(f"Session {http_transport.mcp_session_id} crashed") diff --git a/tests/server/test_streamable_http_manager.py b/tests/server/test_streamable_http_manager.py index 3bd31f0f3..fe33813ee 100644 --- a/tests/server/test_streamable_http_manager.py +++ b/tests/server/test_streamable_http_manager.py @@ -337,10 +337,9 @@ async def handle_list_tools(ctx: ServerRequestContext, params: PaginatedRequestP @pytest.mark.anyio async def test_idle_session_is_reaped(): - """Idle timeout sets a cancel scope deadline and reaps the session when it fires.""" - idle_timeout = 300 + """After idle timeout fires, the session returns 404.""" app = Server("test-idle-reap") - manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=idle_timeout) + manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=0.05) async with manager.run(): sent_messages: list[Message] = [] @@ -358,7 +357,6 @@ async def mock_send(message: Message): async def mock_receive(): # pragma: no cover return {"type": "http.request", "body": b"", "more_body": False} - before = anyio.current_time() await manager.handle_request(scope, mock_receive, mock_send) session_id = None @@ -372,35 +370,43 @@ async def mock_receive(): # pragma: no cover break assert session_id is not None, "Session ID not found in response headers" - assert session_id in manager._server_instances - - # Verify the idle deadline was set correctly - transport = manager._server_instances[session_id] - assert transport.idle_scope is not None - assert transport.idle_scope.deadline >= before + idle_timeout - - # Simulate time passing by expiring the deadline - transport.idle_scope.deadline = anyio.current_time() - - with anyio.fail_after(5): - while session_id in manager._server_instances: - await anyio.sleep(0) - - assert session_id not in manager._server_instances - - # Verify terminate() is idempotent - await transport.terminate() - assert transport.is_terminated - - -@pytest.mark.parametrize( - "kwargs,match", - [ - ({"session_idle_timeout": -1}, "positive number"), - ({"session_idle_timeout": 0}, "positive number"), - ({"session_idle_timeout": 30, "stateless": True}, "not supported in stateless"), - ], -) -def test_session_idle_timeout_validation(kwargs: dict[str, Any], match: str): - with pytest.raises(ValueError, match=match): - StreamableHTTPSessionManager(app=Server("test"), **kwargs) + + # Wait for the 50ms idle timeout to fire and cleanup to complete + await anyio.sleep(0.1) + + # Verify via public API: old session ID now returns 404 + response_messages: list[Message] = [] + + async def capture_send(message: Message): + response_messages.append(message) + + scope_with_session = { + "type": "http", + "method": "POST", + "path": "/mcp", + "headers": [ + (b"content-type", b"application/json"), + (b"mcp-session-id", session_id.encode()), + ], + } + + await manager.handle_request(scope_with_session, mock_receive, capture_send) + + response_start = next( + (msg for msg in response_messages if msg["type"] == "http.response.start"), + None, + ) + assert response_start is not None + assert response_start["status"] == 404 + + +def test_session_idle_timeout_rejects_non_positive(): + with pytest.raises(ValueError, match="positive number"): + StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=-1) + with pytest.raises(ValueError, match="positive number"): + StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=0) + + +def test_session_idle_timeout_rejects_stateless(): + with pytest.raises(RuntimeError, match="not supported in stateless"): + StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=30, stateless=True)