Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/mcp/server/streamable_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -773,8 +773,12 @@ async def terminate(self) -> None:
"""Terminate the current session, closing all streams.

Once terminated, all requests with this session ID will receive 404 Not Found.
Calling this method multiple times is safe (idempotent).
"""

if self._terminated:
return

self._terminated = True
logger.info(f"Terminating session: {self.mcp_session_id}")

Expand Down
75 changes: 75 additions & 0 deletions src/mcp/server/streamable_http_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class StreamableHTTPSessionManager:
2. Resumability via an optional event store
3. Connection management and lifecycle
4. Request handling and transport setup
5. Idle session cleanup via optional timeout

Important: Only one StreamableHTTPSessionManager instance should be created
per application. The instance cannot be reused after its run() context has
Expand All @@ -55,6 +56,22 @@ class StreamableHTTPSessionManager:
security_settings: Optional transport security settings.
retry_interval: Retry interval in milliseconds to suggest to clients in SSE
retry field. Used for SSE polling behavior.
session_idle_timeout: Optional idle timeout in seconds for stateful sessions.
If set, sessions that receive no HTTP requests for this
duration will be automatically terminated and removed.
When retry_interval is also set, the effective idle
threshold is at least ``retry_interval / 1000 * 3`` to
avoid prematurely reaping sessions that are simply
waiting for SSE polling reconnections. Default is None
(no timeout). A value of 1800 (30 minutes) is
recommended for most deployments.

Note: The idle timer is based on incoming HTTP requests
(POST, GET, DELETE), not on whether SSE connections are
open. If clients maintain long-lived GET SSE streams
without sending other requests, set this value higher
than the longest expected SSE connection lifetime to
avoid premature reaping.
"""

def __init__(
Expand All @@ -65,17 +82,25 @@ def __init__(
stateless: bool = False,
security_settings: TransportSecuritySettings | None = None,
retry_interval: int | None = None,
session_idle_timeout: float | None = None,
):
if session_idle_timeout is not None and session_idle_timeout <= 0:
raise ValueError("session_idle_timeout must be a positive number of seconds")
if stateless and session_idle_timeout is not None:
raise ValueError("session_idle_timeout is not supported in stateless mode")

self.app = app
self.event_store = event_store
self.json_response = json_response
self.stateless = stateless
self.security_settings = security_settings
self.retry_interval = retry_interval
self.session_idle_timeout = session_idle_timeout

# Session tracking (only used if not stateless)
self._session_creation_lock = anyio.Lock()
self._server_instances: dict[str, StreamableHTTPServerTransport] = {}
self._last_activity: dict[str, float] = {}

# The task group will be set during lifespan
self._task_group = None
Expand Down Expand Up @@ -114,6 +139,11 @@ async def lifespan(app: Starlette) -> AsyncIterator[None]:
# Store the task group for later use
self._task_group = tg
logger.info("StreamableHTTP session manager started")

# Start idle session reaper if timeout is configured
if self.session_idle_timeout is not None:
tg.start_soon(self._idle_session_reaper)

try:
yield # Let the application run
finally:
Expand All @@ -123,6 +153,7 @@ async def lifespan(app: Starlette) -> AsyncIterator[None]:
self._task_group = None
# Clear any remaining server instances
self._server_instances.clear()
self._last_activity.clear()

async def handle_request(
self,
Expand Down Expand Up @@ -219,6 +250,8 @@ async def _handle_stateful_request(
if request_mcp_session_id is not None and request_mcp_session_id in self._server_instances: # pragma: no cover
transport = self._server_instances[request_mcp_session_id]
logger.debug("Session already exists, handling request directly")
# Update activity timestamp for idle timeout tracking
self._last_activity[request_mcp_session_id] = anyio.current_time()
await transport.handle_request(scope, receive, send)
return

Expand All @@ -237,6 +270,7 @@ async def _handle_stateful_request(

assert http_transport.mcp_session_id is not None
self._server_instances[http_transport.mcp_session_id] = http_transport
self._last_activity[http_transport.mcp_session_id] = anyio.current_time()
logger.info(f"Created new transport with session ID: {new_session_id}")

# Define the server runner
Expand Down Expand Up @@ -269,6 +303,7 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE
"active instances."
)
del self._server_instances[http_transport.mcp_session_id]
self._last_activity.pop(http_transport.mcp_session_id, None)

# Assert task group is not None for type checking
assert self._task_group is not None
Expand All @@ -295,3 +330,43 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE
media_type="application/json",
)
await response(scope, receive, send)

def _effective_idle_timeout(self) -> float:
"""Compute the effective idle timeout, accounting for retry_interval.

When SSE polling is configured via ``retry_interval`` (milliseconds),
the client may legitimately go quiet between polls. The idle threshold
must be large enough so that normal polling gaps don't cause premature
session reaping.
"""
assert self.session_idle_timeout is not None
timeout = self.session_idle_timeout
if self.retry_interval is not None:
retry_seconds = self.retry_interval / 1000.0
timeout = max(timeout, retry_seconds * 3)
return timeout

async def _idle_session_reaper(self) -> None:
"""Background task that periodically terminates idle sessions."""
timeout = self._effective_idle_timeout()
scan_interval = min(timeout / 2, 30.0)
logger.info(f"Idle session reaper started (timeout={timeout}s, scan_interval={scan_interval}s)")

while True:
await anyio.sleep(scan_interval)
now = anyio.current_time()
# Snapshot keys to avoid mutation during iteration
for session_id in list(self._server_instances.keys()):
last = self._last_activity.get(session_id)
if last is None:
continue # pragma: no cover
if now - last > timeout:
transport = self._server_instances.get(session_id)
if transport is None:
continue # pragma: no cover
logger.info(
f"Terminating idle session {session_id} (idle for {now - last:.1f}s, timeout={timeout}s)"
)
self._server_instances.pop(session_id, None)
self._last_activity.pop(session_id, None)
await transport.terminate()
Loading
Loading