Skip to content

Commit 1c51b8f

Browse files
Merge branch 'main' into swathim/mcpsessionchanges
2 parents 1741b1d + b6b4042 commit 1c51b8f

File tree

10 files changed

+533
-29
lines changed

10 files changed

+533
-29
lines changed

README.md

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1394,6 +1394,8 @@ Run from the repository root:
13941394
uvicorn examples.snippets.servers.streamable_http_basic_mounting:app --reload
13951395
"""
13961396

1397+
import contextlib
1398+
13971399
from starlette.applications import Starlette
13981400
from starlette.routing import Mount
13991401

@@ -1409,11 +1411,19 @@ def hello() -> str:
14091411
return "Hello from MCP!"
14101412

14111413

1414+
# Create a lifespan context manager to run the session manager
1415+
@contextlib.asynccontextmanager
1416+
async def lifespan(app: Starlette):
1417+
async with mcp.session_manager.run():
1418+
yield
1419+
1420+
14121421
# Mount the StreamableHTTP server to the existing ASGI server
14131422
app = Starlette(
14141423
routes=[
14151424
Mount("/", app=mcp.streamable_http_app()),
1416-
]
1425+
],
1426+
lifespan=lifespan,
14171427
)
14181428
```
14191429

@@ -1431,6 +1441,8 @@ Run from the repository root:
14311441
uvicorn examples.snippets.servers.streamable_http_host_mounting:app --reload
14321442
"""
14331443

1444+
import contextlib
1445+
14341446
from starlette.applications import Starlette
14351447
from starlette.routing import Host
14361448

@@ -1446,11 +1458,19 @@ def domain_info() -> str:
14461458
return "This is served from mcp.acme.corp"
14471459

14481460

1461+
# Create a lifespan context manager to run the session manager
1462+
@contextlib.asynccontextmanager
1463+
async def lifespan(app: Starlette):
1464+
async with mcp.session_manager.run():
1465+
yield
1466+
1467+
14491468
# Mount using Host-based routing
14501469
app = Starlette(
14511470
routes=[
14521471
Host("mcp.acme.corp", app=mcp.streamable_http_app()),
1453-
]
1472+
],
1473+
lifespan=lifespan,
14541474
)
14551475
```
14561476

@@ -1468,6 +1488,8 @@ Run from the repository root:
14681488
uvicorn examples.snippets.servers.streamable_http_multiple_servers:app --reload
14691489
"""
14701490

1491+
import contextlib
1492+
14711493
from starlette.applications import Starlette
14721494
from starlette.routing import Mount
14731495

@@ -1495,12 +1517,23 @@ def send_message(message: str) -> str:
14951517
api_mcp.settings.streamable_http_path = "/"
14961518
chat_mcp.settings.streamable_http_path = "/"
14971519

1520+
1521+
# Create a combined lifespan to manage both session managers
1522+
@contextlib.asynccontextmanager
1523+
async def lifespan(app: Starlette):
1524+
async with contextlib.AsyncExitStack() as stack:
1525+
await stack.enter_async_context(api_mcp.session_manager.run())
1526+
await stack.enter_async_context(chat_mcp.session_manager.run())
1527+
yield
1528+
1529+
14981530
# Mount the servers
14991531
app = Starlette(
15001532
routes=[
15011533
Mount("/api", app=api_mcp.streamable_http_app()),
15021534
Mount("/chat", app=chat_mcp.streamable_http_app()),
1503-
]
1535+
],
1536+
lifespan=lifespan,
15041537
)
15051538
```
15061539

examples/snippets/servers/streamable_http_basic_mounting.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
uvicorn examples.snippets.servers.streamable_http_basic_mounting:app --reload
66
"""
77

8+
import contextlib
9+
810
from starlette.applications import Starlette
911
from starlette.routing import Mount
1012

@@ -20,9 +22,17 @@ def hello() -> str:
2022
return "Hello from MCP!"
2123

2224

25+
# Create a lifespan context manager to run the session manager
26+
@contextlib.asynccontextmanager
27+
async def lifespan(app: Starlette):
28+
async with mcp.session_manager.run():
29+
yield
30+
31+
2332
# Mount the StreamableHTTP server to the existing ASGI server
2433
app = Starlette(
2534
routes=[
2635
Mount("/", app=mcp.streamable_http_app()),
27-
]
36+
],
37+
lifespan=lifespan,
2838
)

examples/snippets/servers/streamable_http_host_mounting.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
uvicorn examples.snippets.servers.streamable_http_host_mounting:app --reload
66
"""
77

8+
import contextlib
9+
810
from starlette.applications import Starlette
911
from starlette.routing import Host
1012

@@ -20,9 +22,17 @@ def domain_info() -> str:
2022
return "This is served from mcp.acme.corp"
2123

2224

25+
# Create a lifespan context manager to run the session manager
26+
@contextlib.asynccontextmanager
27+
async def lifespan(app: Starlette):
28+
async with mcp.session_manager.run():
29+
yield
30+
31+
2332
# Mount using Host-based routing
2433
app = Starlette(
2534
routes=[
2635
Host("mcp.acme.corp", app=mcp.streamable_http_app()),
27-
]
36+
],
37+
lifespan=lifespan,
2838
)

examples/snippets/servers/streamable_http_multiple_servers.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
uvicorn examples.snippets.servers.streamable_http_multiple_servers:app --reload
66
"""
77

8+
import contextlib
9+
810
from starlette.applications import Starlette
911
from starlette.routing import Mount
1012

@@ -32,10 +34,21 @@ def send_message(message: str) -> str:
3234
api_mcp.settings.streamable_http_path = "/"
3335
chat_mcp.settings.streamable_http_path = "/"
3436

37+
38+
# Create a combined lifespan to manage both session managers
39+
@contextlib.asynccontextmanager
40+
async def lifespan(app: Starlette):
41+
async with contextlib.AsyncExitStack() as stack:
42+
await stack.enter_async_context(api_mcp.session_manager.run())
43+
await stack.enter_async_context(chat_mcp.session_manager.run())
44+
yield
45+
46+
3547
# Mount the servers
3648
app = Starlette(
3749
routes=[
3850
Mount("/api", app=api_mcp.streamable_http_app()),
3951
Mount("/chat", app=chat_mcp.streamable_http_app()),
40-
]
52+
],
53+
lifespan=lifespan,
4154
)

src/mcp/server/streamable_http.py

Lines changed: 53 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -238,30 +238,50 @@ def _create_session_message( # pragma: no cover
238238
message: JSONRPCMessage,
239239
request: Request,
240240
request_id: RequestId,
241+
protocol_version: str,
241242
) -> SessionMessage:
242-
"""Create a session message with metadata including close_sse_stream callback."""
243+
"""Create a session message with metadata including close_sse_stream callback.
243244
244-
async def close_stream_callback() -> None:
245-
self.close_sse_stream(request_id)
245+
The close_sse_stream callbacks are only provided when the client supports
246+
resumability (protocol version >= 2025-11-25). Old clients can't resume if
247+
the stream is closed early because they didn't receive a priming event.
248+
"""
249+
# Only provide close callbacks when client supports resumability
250+
if self._event_store and protocol_version >= "2025-11-25":
246251

247-
async def close_standalone_stream_callback() -> None:
248-
self.close_standalone_sse_stream()
252+
async def close_stream_callback() -> None:
253+
self.close_sse_stream(request_id)
254+
255+
async def close_standalone_stream_callback() -> None:
256+
self.close_standalone_sse_stream()
257+
258+
metadata = ServerMessageMetadata(
259+
request_context=request,
260+
close_sse_stream=close_stream_callback,
261+
close_standalone_sse_stream=close_standalone_stream_callback,
262+
)
263+
else:
264+
metadata = ServerMessageMetadata(request_context=request)
249265

250-
metadata = ServerMessageMetadata(
251-
request_context=request,
252-
close_sse_stream=close_stream_callback,
253-
close_standalone_sse_stream=close_standalone_stream_callback,
254-
)
255266
return SessionMessage(message, metadata=metadata)
256267

257-
async def _send_priming_event( # pragma: no cover
268+
async def _maybe_send_priming_event(
258269
self,
259270
request_id: RequestId,
260271
sse_stream_writer: MemoryObjectSendStream[dict[str, Any]],
272+
protocol_version: str,
261273
) -> None:
262-
"""Send priming event for SSE resumability if event_store is configured."""
274+
"""Send priming event for SSE resumability if event_store is configured.
275+
276+
Only sends priming events to clients with protocol version >= 2025-11-25,
277+
which includes the fix for handling empty SSE data. Older clients would
278+
crash trying to parse empty data as JSON.
279+
"""
263280
if not self._event_store:
264281
return
282+
# Priming events have empty data which older clients cannot handle.
283+
if protocol_version < "2025-11-25":
284+
return
265285
priming_event_id = await self._event_store.store_event(
266286
str(request_id), # Convert RequestId to StreamId (str)
267287
None, # Priming event has no payload
@@ -499,6 +519,15 @@ async def _handle_post_request(self, scope: Scope, request: Request, receive: Re
499519

500520
return
501521

522+
# Extract protocol version for priming event decision.
523+
# For initialize requests, get from request params.
524+
# For other requests, get from header (already validated).
525+
protocol_version = (
526+
str(message.root.params.get("protocolVersion", DEFAULT_NEGOTIATED_VERSION))
527+
if is_initialization_request and message.root.params
528+
else request.headers.get(MCP_PROTOCOL_VERSION_HEADER, DEFAULT_NEGOTIATED_VERSION)
529+
)
530+
502531
# Extract the request ID outside the try block for proper scope
503532
request_id = str(message.root.id) # pragma: no cover
504533
# Register this stream for the request ID
@@ -560,7 +589,7 @@ async def sse_writer():
560589
try:
561590
async with sse_stream_writer, request_stream_reader:
562591
# Send priming event for SSE resumability
563-
await self._send_priming_event(request_id, sse_stream_writer)
592+
await self._maybe_send_priming_event(request_id, sse_stream_writer, protocol_version)
564593

565594
# Process messages from the request-specific stream
566595
async for event_message in request_stream_reader:
@@ -605,7 +634,7 @@ async def sse_writer():
605634
async with anyio.create_task_group() as tg:
606635
tg.start_soon(response, scope, receive, send)
607636
# Then send the message to be processed by the server
608-
session_message = self._create_session_message(message, request, request_id)
637+
session_message = self._create_session_message(message, request, request_id, protocol_version)
609638
await writer.send(session_message)
610639
except Exception:
611640
logger.exception("SSE response error")
@@ -864,6 +893,9 @@ async def _replay_events(self, last_event_id: str, request: Request, send: Send)
864893
if self.mcp_session_id:
865894
headers[MCP_SESSION_ID_HEADER] = self.mcp_session_id
866895

896+
# Get protocol version from header (already validated in _validate_protocol_version)
897+
replay_protocol_version = request.headers.get(MCP_PROTOCOL_VERSION_HEADER, DEFAULT_NEGOTIATED_VERSION)
898+
867899
# Create SSE stream for replay
868900
sse_stream_writer, sse_stream_reader = anyio.create_memory_object_stream[dict[str, str]](0)
869901

@@ -884,7 +916,7 @@ async def send_event(event_message: EventMessage) -> None:
884916
self._sse_stream_writers[stream_id] = sse_stream_writer
885917

886918
# Send priming event for this new connection
887-
await self._send_priming_event(stream_id, sse_stream_writer)
919+
await self._maybe_send_priming_event(stream_id, sse_stream_writer, replay_protocol_version)
888920

889921
# Create new request streams for this connection
890922
self._request_streams[stream_id] = anyio.create_memory_object_stream[EventMessage](0)
@@ -1000,11 +1032,16 @@ async def message_router(): # pragma: no cover
10001032
# Stream might be closed, remove from registry
10011033
self._request_streams.pop(request_stream_id, None)
10021034
else:
1003-
logging.debug(
1035+
logger.debug(
10041036
f"""Request stream {request_stream_id} not found
10051037
for message. Still processing message as the client
10061038
might reconnect and replay."""
10071039
)
1040+
except anyio.ClosedResourceError:
1041+
if self._terminated:
1042+
logger.debug("Read stream closed by client")
1043+
else:
1044+
logger.exception("Unexpected closure of read stream in message router")
10081045
except Exception:
10091046
logger.exception("Error in message router")
10101047

src/mcp/shared/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
from mcp.types import LATEST_PROTOCOL_VERSION
22

3-
SUPPORTED_PROTOCOL_VERSIONS: list[str] = ["2024-11-05", "2025-03-26", LATEST_PROTOCOL_VERSION]
3+
SUPPORTED_PROTOCOL_VERSIONS: list[str] = ["2024-11-05", "2025-03-26", "2025-06-18", LATEST_PROTOCOL_VERSION]

src/mcp/types.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
not separate types in the schema.
2626
"""
2727

28-
LATEST_PROTOCOL_VERSION = "2025-06-18"
28+
LATEST_PROTOCOL_VERSION = "2025-11-25"
2929

3030
"""
3131
The default negotiated version of the Model Context Protocol when no version is specified.

tests/issues/test_1027_win_unreachable_cleanup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ def echo(text: str) -> str:
9595
async with ClientSession(read, write) as session:
9696
# Initialize the session
9797
result = await session.initialize()
98-
assert result.protocolVersion in ["2024-11-05", "2025-06-18"]
98+
assert result.protocolVersion in ["2024-11-05", "2025-06-18", "2025-11-25"]
9999

100100
# Verify startup marker was created
101101
assert Path(startup_marker).exists(), "Server startup marker not created"

0 commit comments

Comments
 (0)