Skip to content

Commit 6c9c320

Browse files
committed
json mode
1 parent 2cebf08 commit 6c9c320

File tree

4 files changed

+258
-105
lines changed

4 files changed

+258
-105
lines changed

examples/servers/simple-streamablehttp/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ uv run mcp-simple-streamablehttp --port 3000
2020

2121
# Custom logging level
2222
uv run mcp-simple-streamablehttp --log-level DEBUG
23+
24+
# Enable JSON responses instead of SSE streams
25+
uv run mcp-simple-streamablehttp --json-response
2326
```
2427

2528
The server exposes a tool named "start-notification-stream" that accepts three arguments:

examples/servers/simple-streamablehttp/mcp_simple_streamablehttp/server.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,16 @@ async def lifespan(app):
4848
default="INFO",
4949
help="Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)",
5050
)
51+
@click.option(
52+
"--json-response",
53+
is_flag=True,
54+
default=False,
55+
help="Enable JSON responses instead of SSE streams",
56+
)
5157
def main(
5258
port: int,
5359
log_level: str,
60+
json_response: bool,
5461
) -> int:
5562
# Configure logging
5663
logging.basicConfig(
@@ -145,7 +152,7 @@ async def handle_streamable_http(scope, receive, send):
145152
async with session_creation_lock:
146153
new_session_id = uuid4().hex
147154
http_transport = StreamableHTTPServerTransport(
148-
mcp_session_id=new_session_id,
155+
mcp_session_id=new_session_id, is_json_response_enabled=json_response
149156
)
150157
async with http_transport.connect() as streams:
151158
read_stream, write_stream = streams

src/mcp/server/streamableHttp.py

Lines changed: 204 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class StreamableHTTPServerTransport:
5454
5555
Handles POST requests containing JSON-RPC messages and provides
5656
Server-Sent Events (SSE) responses for streaming communication.
57+
When configured, can also return JSON responses instead of SSE streams.
5758
"""
5859

5960
# Server notification streams for POST requests as well as standalone SSE stream
@@ -65,13 +66,16 @@ class StreamableHTTPServerTransport:
6566
def __init__(
6667
self,
6768
mcp_session_id: str | None,
69+
is_json_response_enabled: bool = False,
6870
):
6971
"""
7072
Initialize a new StreamableHTTP server transport.
7173
7274
Args:
7375
mcp_session_id: Optional session identifier for this connection.
7476
Must contain only visible ASCII characters (0x21-0x7E).
77+
is_json_response_enabled: If True, return JSON responses for requests
78+
instead of SSE streams. Default is False.
7579
7680
Raises:
7781
ValueError: If the session ID contains invalid characters.
@@ -85,6 +89,7 @@ def __init__(
8589
)
8690

8791
self.mcp_session_id = mcp_session_id
92+
self.is_json_response_enabled = is_json_response_enabled
8893
self._request_streams = {}
8994
self._terminated = False
9095

@@ -110,6 +115,36 @@ def _create_error_response(
110115
headers=response_headers,
111116
)
112117

118+
def _create_json_response(
119+
self,
120+
response_message: JSONRPCMessage,
121+
status_code: HTTPStatus = HTTPStatus.OK,
122+
headers: dict[str, str] | None = None,
123+
) -> Response:
124+
"""
125+
Create a JSON response from a JSONRPCMessage.
126+
127+
Args:
128+
response_message: The JSON-RPC message to include in the response
129+
status_code: HTTP status code (default: 200 OK)
130+
headers: Additional headers to include
131+
132+
Returns:
133+
A Starlette Response object with the JSON-RPC message
134+
"""
135+
response_headers = {"Content-Type": CONTENT_TYPE_JSON}
136+
if headers:
137+
response_headers.update(headers)
138+
139+
if self.mcp_session_id:
140+
response_headers[MCP_SESSION_ID_HEADER] = self.mcp_session_id
141+
142+
return Response(
143+
response_message.model_dump_json(by_alias=True, exclude_none=True),
144+
status_code=status_code,
145+
headers=response_headers,
146+
)
147+
113148
def _get_session_id(self, request: Request) -> str | None:
114149
"""
115150
Extract the session ID from request headers.
@@ -303,105 +338,183 @@ async def _handle_post_request(
303338

304339
return
305340

306-
# For requests, set up an SSE stream for the response
341+
# For requests, determine whether to return JSON or set up SSE stream
307342
if is_request:
308-
# Set up headers
309-
headers = {
310-
"Cache-Control": "no-cache, no-transform",
311-
"Connection": "keep-alive",
312-
"Content-Type": CONTENT_TYPE_SSE,
313-
}
343+
if self.is_json_response_enabled:
344+
# JSON response mode - create a response future
345+
request_id = None
346+
if isinstance(message.root, JSONRPCRequest):
347+
request_id = str(message.root.id)
348+
349+
if not request_id:
350+
# Should not happen for valid JSONRPCRequest, but handle just in case
351+
response = self._create_error_response(
352+
"Invalid Request: Missing request ID",
353+
HTTPStatus.BAD_REQUEST,
354+
)
355+
await response(scope, receive, send)
356+
return
314357

315-
if self.mcp_session_id:
316-
headers[MCP_SESSION_ID_HEADER] = self.mcp_session_id
317-
# Create SSE stream
318-
sse_stream_writer, sse_stream_reader = (
319-
anyio.create_memory_object_stream[dict[str, Any]](0)
320-
)
358+
# Create promise stream for getting response
359+
request_stream_writer, request_stream_reader = (
360+
anyio.create_memory_object_stream[JSONRPCMessage](0)
361+
)
321362

322-
async def sse_writer():
323-
try:
324-
# Create a request-specific message stream for this POST request
325-
request_stream_writer, request_stream_reader = (
326-
anyio.create_memory_object_stream[JSONRPCMessage](0)
327-
)
363+
# Register this stream for the request ID
364+
self._request_streams[request_id] = request_stream_writer
328365

329-
# Get the request ID from the incoming request message
330-
request_id = None
331-
if isinstance(message.root, JSONRPCRequest):
332-
request_id = str(message.root.id)
333-
# Register this stream for the request ID
334-
if request_id:
335-
self._request_streams[request_id] = (
336-
request_stream_writer
337-
)
366+
# Process the message
367+
await writer.send(message)
338368

339-
async with sse_stream_writer, request_stream_reader:
340-
# Process messages from the request-specific stream
341-
async for received_message in request_stream_reader:
342-
# Send the message via SSE
343-
related_request_id = None
344-
345-
if isinstance(
346-
received_message.root, JSONRPCNotification
347-
):
348-
# Get related_request_id from params
349-
params = received_message.root.params
350-
if params and "related_request_id" in params:
351-
related_request_id = params.get(
352-
"related_request_id"
353-
)
354-
logger.debug(
355-
f"NOTIFICATION: {related_request_id}, "
356-
f"{params.get('data')}"
357-
)
358-
359-
# Build the event data
360-
event_data = {
361-
"event": "message",
362-
"data": received_message.model_dump_json(
363-
by_alias=True, exclude_none=True
364-
),
365-
}
366-
367-
await sse_stream_writer.send(event_data)
368-
369-
# If response, remove from pending streams and close
370-
if isinstance(received_message.root, JSONRPCResponse):
371-
if request_id:
372-
self._request_streams.pop(request_id, None)
373-
break
369+
try:
370+
# Process messages from the request-specific stream
371+
# We need to collect all messages until we get a response
372+
response_message = None
373+
374+
# Use similar approach to SSE writer for consistency
375+
async for received_message in request_stream_reader:
376+
# If it's a response, this is what we're waiting for
377+
if isinstance(received_message.root, JSONRPCResponse):
378+
response_message = received_message
379+
break
380+
# For notifications, we need to keep waiting for the actual response
381+
elif isinstance(received_message.root, JSONRPCNotification):
382+
# Just process it and continue waiting
383+
logger.debug(
384+
f"Received notification while waiting for response: {received_message.root.method}"
385+
)
386+
continue
387+
388+
# At this point we should have a response
389+
if response_message:
390+
# Create JSON response
391+
response = self._create_json_response(response_message)
392+
await response(scope, receive, send)
393+
else:
394+
# This shouldn't happen in normal operation
395+
logger.error("No response message received before stream closed")
396+
response = self._create_error_response(
397+
"Error processing request: No response received",
398+
HTTPStatus.INTERNAL_SERVER_ERROR,
399+
)
400+
await response(scope, receive, send)
374401
except Exception as e:
375-
logger.exception(f"Error in SSE writer: {e}")
402+
logger.exception(f"Error processing JSON response: {e}")
403+
response = self._create_error_response(
404+
f"Error processing request: {str(e)}",
405+
HTTPStatus.INTERNAL_SERVER_ERROR,
406+
)
407+
await response(scope, receive, send)
376408
finally:
377-
logger.debug("Closing SSE writer")
378-
# TODO
379-
380-
# Create and start EventSourceResponse
381-
response = EventSourceResponse(
382-
content=sse_stream_reader,
383-
data_sender_callable=sse_writer,
384-
headers=headers,
385-
)
386-
387-
# Extract the request ID outside the try block for proper scope
388-
outer_request_id = None
389-
if isinstance(message.root, JSONRPCRequest):
390-
outer_request_id = str(message.root.id)
409+
# Clean up the request stream
410+
if request_id in self._request_streams:
411+
self._request_streams.pop(request_id, None)
412+
await request_stream_reader.aclose()
413+
await request_stream_writer.aclose()
414+
else:
415+
# SSE stream mode (original behavior)
416+
# Set up headers
417+
headers = {
418+
"Cache-Control": "no-cache, no-transform",
419+
"Connection": "keep-alive",
420+
"Content-Type": CONTENT_TYPE_SSE,
421+
}
422+
423+
if self.mcp_session_id:
424+
headers[MCP_SESSION_ID_HEADER] = self.mcp_session_id
425+
# Create SSE stream
426+
sse_stream_writer, sse_stream_reader = (
427+
anyio.create_memory_object_stream[dict[str, Any]](0)
428+
)
429+
430+
async def sse_writer():
431+
try:
432+
# Create a request-specific message stream for this POST request
433+
request_stream_writer, request_stream_reader = (
434+
anyio.create_memory_object_stream[JSONRPCMessage](0)
435+
)
436+
437+
# Get the request ID from the incoming request message
438+
request_id = None
439+
if isinstance(message.root, JSONRPCRequest):
440+
request_id = str(message.root.id)
441+
# Register this stream for the request ID
442+
if request_id:
443+
self._request_streams[request_id] = (
444+
request_stream_writer
445+
)
446+
447+
async with sse_stream_writer, request_stream_reader:
448+
# Process messages from the request-specific stream
449+
async for received_message in request_stream_reader:
450+
# Send the message via SSE
451+
related_request_id = None
452+
453+
if isinstance(
454+
received_message.root, JSONRPCNotification
455+
):
456+
# Get related_request_id from params
457+
params = received_message.root.params
458+
if params and "related_request_id" in params:
459+
related_request_id = params.get(
460+
"related_request_id"
461+
)
462+
logger.debug(
463+
f"NOTIFICATION: {related_request_id}, "
464+
f"{params.get('data')}"
465+
)
466+
467+
# Build the event data
468+
event_data = {
469+
"event": "message",
470+
"data": received_message.model_dump_json(
471+
by_alias=True, exclude_none=True
472+
),
473+
}
474+
475+
await sse_stream_writer.send(event_data)
476+
477+
# If response, remove from pending streams and close
478+
if isinstance(
479+
received_message.root, JSONRPCResponse
480+
):
481+
if request_id:
482+
self._request_streams.pop(request_id, None)
483+
break
484+
except Exception as e:
485+
logger.exception(f"Error in SSE writer: {e}")
486+
finally:
487+
logger.debug("Closing SSE writer")
488+
# TODO
489+
490+
# Create and start EventSourceResponse
491+
response = EventSourceResponse(
492+
content=sse_stream_reader,
493+
data_sender_callable=sse_writer,
494+
headers=headers,
495+
)
496+
497+
# Extract the request ID outside the try block for proper scope
498+
outer_request_id = None
499+
if isinstance(message.root, JSONRPCRequest):
500+
outer_request_id = str(message.root.id)
501+
502+
# Start the SSE response (this will send headers immediately)
503+
try:
504+
# First send the response to establish the SSE connection
505+
async with anyio.create_task_group() as tg:
506+
tg.start_soon(response, scope, receive, send)
391507

392-
# Start the SSE response (this will send headers immediately)
393-
try:
394-
# First send the response to establish the SSE connection
395-
async with anyio.create_task_group() as tg:
396-
tg.start_soon(response, scope, receive, send)
397-
398-
# Then send the message to be processed by the server
399-
await writer.send(message)
400-
except Exception:
401-
logger.exception("SSE response error")
402-
# Make sure to clean up the request stream if something goes wrong
403-
if outer_request_id and outer_request_id in self._request_streams:
404-
self._request_streams.pop(outer_request_id, None)
508+
# Then send the message to be processed by the server
509+
await writer.send(message)
510+
except Exception:
511+
logger.exception("SSE response error")
512+
# Make sure to clean up the request stream if something goes wrong
513+
if (
514+
outer_request_id
515+
and outer_request_id in self._request_streams
516+
):
517+
self._request_streams.pop(outer_request_id, None)
405518

406519
except Exception as err:
407520
logger.exception("Error handling POST request")

0 commit comments

Comments
 (0)