Skip to content

Commit 3441e82

Browse files
Add TDD stubs and failing tests for SSE polling (close_sse_stream)
This commit adds the API stubs and failing tests for the server-side disconnect feature that enables SSE polling. When implemented, this will allow servers to disconnect SSE streams without terminating them, triggering client reconnection for polling patterns. API stubs added: - CloseSSEStreamCallback type in message.py - close_sse_stream field in ServerMessageMetadata and RequestContext - close_sse_stream() stub in StreamableHTTPServerTransport - close_sse_stream() stub in FastMCP Context - retry_interval parameter in session manager and transport Tests added (all expected to fail until implementation): - test_streamablehttp_client_receives_priming_event - test_server_close_sse_stream_via_context - test_streamablehttp_client_auto_reconnects - test_streamablehttp_client_respects_retry_interval - test_streamablehttp_sse_polling_full_cycle - test_streamablehttp_events_replayed_after_disconnect Github-Issue:#1699
1 parent 5983a65 commit 3441e82

File tree

6 files changed

+380
-11
lines changed

6 files changed

+380
-11
lines changed

src/mcp/server/fastmcp/server.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1282,6 +1282,25 @@ def session(self):
12821282
"""Access to the underlying session for advanced usage."""
12831283
return self.request_context.session
12841284

1285+
async def close_sse_stream(self) -> None:
1286+
"""Close the SSE stream to trigger client reconnection.
1287+
1288+
This method closes the HTTP connection for the current request, triggering
1289+
client reconnection. Events continue to be stored in the event store and will
1290+
be replayed when the client reconnects with Last-Event-ID.
1291+
1292+
Use this to implement polling behavior during long-running operations -
1293+
client will reconnect after the retry interval specified in the priming event.
1294+
1295+
Note:
1296+
This is a no-op if not using StreamableHTTP transport with event_store.
1297+
The callback is only available when event_store is configured.
1298+
1299+
Raises:
1300+
NotImplementedError: Feature not yet implemented.
1301+
"""
1302+
raise NotImplementedError("close_sse_stream not yet implemented")
1303+
12851304
# Convenience methods for common log levels
12861305
async def debug(self, message: str, **extra: Any) -> None:
12871306
"""Send a debug log message."""

src/mcp/server/streamable_http.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ def __init__(
140140
is_json_response_enabled: bool = False,
141141
event_store: EventStore | None = None,
142142
security_settings: TransportSecuritySettings | None = None,
143+
retry_interval: int | None = None,
143144
) -> None:
144145
"""
145146
Initialize a new StreamableHTTP server transport.
@@ -153,6 +154,10 @@ def __init__(
153154
resumability will be enabled, allowing clients to
154155
reconnect and resume messages.
155156
security_settings: Optional security settings for DNS rebinding protection.
157+
retry_interval: Retry interval in milliseconds to suggest to clients in SSE
158+
retry field. When set, the server will send a retry field in
159+
SSE priming events to control client reconnection timing for
160+
polling behavior. Only used when event_store is provided.
156161
157162
Raises:
158163
ValueError: If the session ID contains invalid characters.
@@ -164,6 +169,7 @@ def __init__(
164169
self.is_json_response_enabled = is_json_response_enabled
165170
self._event_store = event_store
166171
self._security = TransportSecurityMiddleware(security_settings)
172+
self._retry_interval = retry_interval
167173
self._request_streams: dict[
168174
RequestId,
169175
tuple[
@@ -178,6 +184,26 @@ def is_terminated(self) -> bool:
178184
"""Check if this transport has been explicitly terminated."""
179185
return self._terminated
180186

187+
def close_sse_stream(self, request_id: RequestId) -> None:
188+
"""Close SSE connection for a specific request without terminating the stream.
189+
190+
This method closes the HTTP connection for the specified request, triggering
191+
client reconnection. Events continue to be stored in the event store and will
192+
be replayed when the client reconnects with Last-Event-ID.
193+
194+
Use this to implement polling behavior during long-running operations -
195+
client will reconnect after the retry interval specified in the priming event.
196+
197+
Args:
198+
request_id: The request ID whose SSE stream should be closed.
199+
200+
Note:
201+
This is a no-op if there is no active stream for the request ID.
202+
Requires event_store to be configured for events to be stored during
203+
the disconnect.
204+
"""
205+
raise NotImplementedError("close_sse_stream not yet implemented")
206+
181207
def _create_error_response(
182208
self,
183209
error_message: str,

src/mcp/server/streamable_http_manager.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ class StreamableHTTPSessionManager:
5151
json_response: Whether to use JSON responses instead of SSE streams
5252
stateless: If True, creates a completely fresh transport for each request
5353
with no session tracking or state persistence between requests.
54+
security_settings: Optional transport security settings.
55+
retry_interval: Retry interval in milliseconds to suggest to clients in SSE
56+
retry field. Used for SSE polling behavior.
5457
"""
5558

5659
def __init__(
@@ -60,12 +63,14 @@ def __init__(
6063
json_response: bool = False,
6164
stateless: bool = False,
6265
security_settings: TransportSecuritySettings | None = None,
66+
retry_interval: int | None = None,
6367
):
6468
self.app = app
6569
self.event_store = event_store
6670
self.json_response = json_response
6771
self.stateless = stateless
6872
self.security_settings = security_settings
73+
self.retry_interval = retry_interval
6974

7075
# Session tracking (only used if not stateless)
7176
self._session_creation_lock = anyio.Lock()
@@ -226,6 +231,7 @@ async def _handle_stateful_request(
226231
is_json_response_enabled=self.json_response,
227232
event_store=self.event_store, # May be None (no resumability)
228233
security_settings=self.security_settings,
234+
retry_interval=self.retry_interval,
229235
)
230236

231237
assert http_transport.mcp_session_id is not None

src/mcp/shared/context.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from typing_extensions import TypeVar
55

6+
from mcp.shared.message import CloseSSEStreamCallback
67
from mcp.shared.session import BaseSession
78
from mcp.types import RequestId, RequestParams
89

@@ -18,3 +19,4 @@ class RequestContext(Generic[SessionT, LifespanContextT, RequestT]):
1819
session: SessionT
1920
lifespan_context: LifespanContextT
2021
request: RequestT | None = None
22+
close_sse_stream: CloseSSEStreamCallback | None = None

src/mcp/shared/message.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414

1515
ResumptionTokenUpdateCallback = Callable[[ResumptionToken], Awaitable[None]]
1616

17+
# Callback type for closing SSE streams without terminating
18+
CloseSSEStreamCallback = Callable[[], Awaitable[None]]
19+
1720

1821
@dataclass
1922
class ClientMessageMetadata:
@@ -30,6 +33,8 @@ class ServerMessageMetadata:
3033
related_request_id: RequestId | None = None
3134
# Request-specific context (e.g., headers, auth info)
3235
request_context: object | None = None
36+
# Callback to close SSE stream without terminating
37+
close_sse_stream: CloseSSEStreamCallback | None = None
3338

3439

3540
MessageMetadata = ClientMessageMetadata | ServerMessageMetadata | None

0 commit comments

Comments
 (0)