Skip to content

Commit b1be691

Browse files
committed
get sse
1 parent 6b7a616 commit b1be691

File tree

3 files changed

+190
-20
lines changed

3 files changed

+190
-20
lines changed

examples/servers/simple-streamablehttp/mcp_simple_streamablehttp/server.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
MCP_SESSION_ID_HEADER,
1212
StreamableHTTPServerTransport,
1313
)
14+
from pydantic import AnyUrl
1415
from starlette.applications import Starlette
1516
from starlette.requests import Request
1617
from starlette.responses import Response
@@ -87,6 +88,9 @@ async def call_tool(
8788
if i < count - 1: # Don't wait after the last notification
8889
await anyio.sleep(interval)
8990

91+
# This will send a resource notificaiton though standalone SSE
92+
# established by GET request
93+
await ctx.session.send_resource_updated(uri=AnyUrl("http:///test_resource"))
9094
return [
9195
types.TextContent(
9296
type="text",

src/mcp/server/streamableHttp.py

Lines changed: 97 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@
4343
CONTENT_TYPE_JSON = "application/json"
4444
CONTENT_TYPE_SSE = "text/event-stream"
4545

46+
# Special key for the standalone GET stream
47+
GET_STREAM_KEY = "_GET_stream"
48+
4649
# Session ID validation pattern (visible ASCII characters ranging from 0x21 to 0x7E)
4750
# Pattern ensures entire string contains only valid characters by using ^ and $ anchors
4851
SESSION_ID_PATTERN = re.compile(r"^[\x21-\x7E]+$")
@@ -476,10 +479,19 @@ async def sse_writer():
476479
return
477480

478481
async def _handle_get_request(self, request: Request, send: Send) -> None:
479-
"""Handle GET requests for SSE stream establishment."""
480-
# Validate session ID if server has one
481-
if not await self._validate_session(request, send):
482-
return
482+
"""
483+
Handle GET request to establish SSE.
484+
485+
This allows the server to communicate to the client without the client
486+
first sending data via HTTP POST. The server can send JSON-RPC requests
487+
and notifications on this stream.
488+
"""
489+
writer = self._read_stream_writer
490+
if writer is None:
491+
raise ValueError(
492+
"No read stream writer available. Ensure connect() is called first."
493+
)
494+
483495
# Validate Accept header - must include text/event-stream
484496
_, has_sse = self._check_accept_headers(request)
485497

@@ -491,13 +503,80 @@ async def _handle_get_request(self, request: Request, send: Send) -> None:
491503
await response(request.scope, request.receive, send)
492504
return
493505

494-
# TODO: Implement SSE stream for GET requests
495-
# For now, return 405 Method Not Allowed
496-
response = self._create_server_response(
497-
"SSE stream from GET request not implemented yet",
498-
HTTPStatus.METHOD_NOT_ALLOWED,
506+
if not await self._validate_session(request, send):
507+
return
508+
509+
headers = {
510+
"Cache-Control": "no-cache, no-transform",
511+
"Connection": "keep-alive",
512+
"Content-Type": CONTENT_TYPE_SSE,
513+
}
514+
515+
if self.mcp_session_id:
516+
headers[MCP_SESSION_ID_HEADER] = self.mcp_session_id
517+
518+
# Check if we already have an active GET stream
519+
if GET_STREAM_KEY in self._request_streams:
520+
response = self._create_server_response(
521+
"Conflict: Only one SSE stream is allowed per session",
522+
HTTPStatus.CONFLICT,
523+
)
524+
await response(request.scope, request.receive, send)
525+
return
526+
527+
# Create SSE stream
528+
sse_stream_writer, sse_stream_reader = anyio.create_memory_object_stream[
529+
dict[str, Any]
530+
](0)
531+
532+
async def standalone_sse_writer():
533+
try:
534+
# Create a standalone message stream for server-initiated messages
535+
standalone_stream_writer, standalone_stream_reader = (
536+
anyio.create_memory_object_stream[JSONRPCMessage](0)
537+
)
538+
539+
# Register this stream using the special key
540+
self._request_streams[GET_STREAM_KEY] = standalone_stream_writer
541+
542+
async with sse_stream_writer, standalone_stream_reader:
543+
# Process messages from the standalone stream
544+
async for received_message in standalone_stream_reader:
545+
# For the standalone stream, we handle:
546+
# - JSONRPCNotification (server can send notifications to client)
547+
# - JSONRPCRequest (server can send requests to client)
548+
# We should NOT receive JSONRPCResponse
549+
550+
# Send the message via SSE
551+
event_data = {
552+
"event": "message",
553+
"data": received_message.model_dump_json(
554+
by_alias=True, exclude_none=True
555+
),
556+
}
557+
558+
await sse_stream_writer.send(event_data)
559+
except Exception as e:
560+
logger.exception(f"Error in standalone SSE writer: {e}")
561+
finally:
562+
logger.debug("Closing standalone SSE writer")
563+
# Remove the stream from request_streams
564+
self._request_streams.pop(GET_STREAM_KEY, None)
565+
566+
# Create and start EventSourceResponse
567+
response = EventSourceResponse(
568+
content=sse_stream_reader,
569+
data_sender_callable=standalone_sse_writer,
570+
headers=headers,
499571
)
500-
await response(request.scope, request.receive, send)
572+
573+
try:
574+
# This will send headers immediately and establish the SSE connection
575+
await response(request.scope, request.receive, send)
576+
except Exception as e:
577+
logger.exception(f"Error in standalone SSE response: {e}")
578+
# Clean up the request stream
579+
self._request_streams.pop(GET_STREAM_KEY, None)
501580

502581
async def _handle_delete_request(self, request: Request, send: Send) -> None:
503582
"""Handle DELETE requests for explicit session termination."""
@@ -639,30 +718,28 @@ async def message_router():
639718
# For responses, route based on the request ID
640719
if isinstance(message.root, JSONRPCResponse):
641720
target_request_id = str(message.root.id)
642-
# For notifications, route by related_request_id if available
643-
elif isinstance(message.root, JSONRPCNotification):
644-
# Get related_request_id from params
721+
# For notifications and requests, handle routing logic
722+
elif isinstance(
723+
message.root, JSONRPCNotification
724+
) or isinstance(message.root, JSONRPCRequest):
645725
params = message.root.params
646726
if params and "related_request_id" in params:
647727
related_id = params.get("related_request_id")
648728
if related_id is not None:
649729
target_request_id = str(related_id)
650730

651-
# Send to the specific request stream if available
652-
if (
653-
target_request_id
654-
and target_request_id in self._request_streams
655-
):
731+
request_stream_id = target_request_id or GET_STREAM_KEY
732+
if request_stream_id in self._request_streams:
656733
try:
657-
await self._request_streams[target_request_id].send(
734+
await self._request_streams[request_stream_id].send(
658735
message
659736
)
660737
except (
661738
anyio.BrokenResourceError,
662739
anyio.ClosedResourceError,
663740
):
664741
# Stream might be closed, remove from registry
665-
self._request_streams.pop(target_request_id, None)
742+
self._request_streams.pop(request_stream_id, None)
666743
except Exception as e:
667744
logger.exception(f"Error in message router: {e}")
668745

tests/server/test_streamableHttp.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -542,3 +542,92 @@ def test_json_response(json_response_server, json_server_url):
542542
)
543543
assert response.status_code == 200
544544
assert response.headers.get("Content-Type") == "application/json"
545+
546+
547+
def test_get_sse_stream(basic_server, basic_server_url):
548+
"""Test establishing an SSE stream via GET request."""
549+
# First, we need to initialize a session
550+
mcp_url = f"{basic_server_url}/mcp"
551+
init_response = requests.post(
552+
mcp_url,
553+
headers={
554+
"Accept": "application/json, text/event-stream",
555+
"Content-Type": "application/json",
556+
},
557+
json=INIT_REQUEST,
558+
)
559+
assert init_response.status_code == 200
560+
561+
# Get the session ID
562+
session_id = init_response.headers.get(MCP_SESSION_ID_HEADER)
563+
assert session_id is not None
564+
565+
# Now attempt to establish an SSE stream via GET
566+
get_response = requests.get(
567+
mcp_url,
568+
headers={
569+
"Accept": "text/event-stream",
570+
MCP_SESSION_ID_HEADER: session_id,
571+
},
572+
stream=True,
573+
)
574+
575+
# Verify we got a successful response with the right content type
576+
assert get_response.status_code == 200
577+
assert get_response.headers.get("Content-Type") == "text/event-stream"
578+
579+
# Test that a second GET request gets rejected (only one stream allowed)
580+
second_get = requests.get(
581+
mcp_url,
582+
headers={
583+
"Accept": "text/event-stream",
584+
MCP_SESSION_ID_HEADER: session_id,
585+
},
586+
stream=True,
587+
)
588+
589+
# Should get CONFLICT (409) since there's already a stream
590+
# Note: This might fail if the first stream fully closed before this runs,
591+
# but generally it should work in the test environment where it runs quickly
592+
assert second_get.status_code == 409
593+
594+
595+
def test_get_validation(basic_server, basic_server_url):
596+
"""Test validation for GET requests."""
597+
# First, we need to initialize a session
598+
mcp_url = f"{basic_server_url}/mcp"
599+
init_response = requests.post(
600+
mcp_url,
601+
headers={
602+
"Accept": "application/json, text/event-stream",
603+
"Content-Type": "application/json",
604+
},
605+
json=INIT_REQUEST,
606+
)
607+
assert init_response.status_code == 200
608+
609+
# Get the session ID
610+
session_id = init_response.headers.get(MCP_SESSION_ID_HEADER)
611+
assert session_id is not None
612+
613+
# Test without Accept header
614+
response = requests.get(
615+
mcp_url,
616+
headers={
617+
MCP_SESSION_ID_HEADER: session_id,
618+
},
619+
stream=True,
620+
)
621+
assert response.status_code == 406
622+
assert "Not Acceptable" in response.text
623+
624+
# Test with wrong Accept header
625+
response = requests.get(
626+
mcp_url,
627+
headers={
628+
"Accept": "application/json",
629+
MCP_SESSION_ID_HEADER: session_id,
630+
},
631+
)
632+
assert response.status_code == 406
633+
assert "Not Acceptable" in response.text

0 commit comments

Comments
 (0)