Skip to content

Commit 2b95598

Browse files
committed
initial streamable http server
1 parent babb477 commit 2b95598

File tree

10 files changed

+727
-8
lines changed

10 files changed

+727
-8
lines changed
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# MCP Simple StreamableHttp Server Example
2+
3+
A simple MCP server example demonstrating the StreamableHttp transport, which enables HTTP-based communication with MCP servers using streaming.
4+
5+
## Features
6+
7+
- Uses the StreamableHTTP transport for server-client communication
8+
- Task management with anyio task groups
9+
- Ability to send multiple notifications over time to the client
10+
- Proper resource cleanup and lifespan management
11+
12+
## Usage
13+
14+
Start the server on the default or custom port:
15+
16+
```bash
17+
18+
# Using custom port
19+
uv run mcp-simple-streamablehttp --port 3000
20+
21+
# Custom logging level
22+
uv run mcp-simple-streamablehttp --log-level DEBUG
23+
```
24+
25+
The server exposes a tool named "start-notification-stream" that accepts three arguments:
26+
27+
- `interval`: Time between notifications in seconds (e.g., 1.0)
28+
- `count`: Number of notifications to send (e.g., 5)
29+
- `caller`: Identifier string for the caller
30+
31+
## Client
32+
33+
You can connect to this server using an HTTP client, for now only Typescript SDK has streamable HTTP client examples or you can use (Inspector)[https://github.com/modelcontextprotocol/inspector]

examples/servers/simple-streamablehttp/mcp_simple_streamablehttp/__init__.py

Whitespace-only changes.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from .server import main
2+
3+
if __name__ == "__main__":
4+
main()
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
import contextlib
2+
import logging
3+
from uuid import uuid4
4+
5+
import anyio
6+
import click
7+
import mcp.types as types
8+
from mcp.server.lowlevel import Server
9+
from mcp.server.streamableHttp import StreamableHTTPServerTransport
10+
from starlette.applications import Starlette
11+
from starlette.routing import Mount
12+
13+
# Configure logging
14+
logger = logging.getLogger(__name__)
15+
16+
# Global task group that will be initialized in the lifespan
17+
task_group = None
18+
19+
20+
@contextlib.asynccontextmanager
21+
async def lifespan(app):
22+
"""Application lifespan context manager for managing task group."""
23+
global task_group
24+
25+
async with anyio.create_task_group() as tg:
26+
task_group = tg
27+
logger.info("Application started, task group initialized!")
28+
try:
29+
yield
30+
finally:
31+
logger.info("Application shutting down, cleaning up resources...")
32+
if task_group:
33+
tg.cancel_scope.cancel()
34+
task_group = None
35+
logger.info("Resources cleaned up successfully.")
36+
37+
38+
@click.command()
39+
@click.option("--port", default=3000, help="Port to listen on for HTTP")
40+
@click.option(
41+
"--log-level",
42+
default="INFO",
43+
help="Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)",
44+
)
45+
def main(
46+
port: int,
47+
log_level: str,
48+
) -> int:
49+
# Configure logging
50+
logging.basicConfig(
51+
level=getattr(logging, log_level.upper()),
52+
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
53+
)
54+
55+
app = Server("mcp-streamable-http-demo")
56+
57+
@app.call_tool()
58+
async def call_tool(
59+
name: str, arguments: dict
60+
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
61+
ctx = app.request_context
62+
interval = arguments.get("interval", 1.0)
63+
count = arguments.get("count", 5)
64+
caller = arguments.get("caller", "unknown")
65+
66+
# Send the specified number of notifications with the given interval
67+
for i in range(count):
68+
await ctx.session.send_log_message(
69+
level="info",
70+
data=f"Notification {i+1}/{count} from caller: {caller}",
71+
logger="notification_stream",
72+
related_request_id=ctx.request_id,
73+
)
74+
if i < count - 1: # Don't wait after the last notification
75+
await anyio.sleep(interval)
76+
77+
return [
78+
types.TextContent(
79+
type="text",
80+
text=(
81+
f"Sent {count} notifications with {interval}s interval"
82+
f" for caller: {caller}"
83+
),
84+
)
85+
]
86+
87+
@app.list_tools()
88+
async def list_tools() -> list[types.Tool]:
89+
return [
90+
types.Tool(
91+
name="start-notification-stream",
92+
description=(
93+
"Sends a stream of notifications with configurable count"
94+
" and interval"
95+
),
96+
inputSchema={
97+
"type": "object",
98+
"required": ["interval", "count", "caller"],
99+
"properties": {
100+
"interval": {
101+
"type": "number",
102+
"description": "Interval between notifications in seconds",
103+
},
104+
"count": {
105+
"type": "number",
106+
"description": "Number of notifications to send",
107+
},
108+
"caller": {
109+
"type": "string",
110+
"description": (
111+
"Identifier of the caller to include in notifications"
112+
),
113+
},
114+
},
115+
},
116+
)
117+
]
118+
119+
# Create a Streamable HTTP transport
120+
http_transport = StreamableHTTPServerTransport(
121+
mcp_session_id=uuid4().hex,
122+
)
123+
124+
# We need to store the server instances between requests
125+
server_instances = {}
126+
127+
# ASGI handler for streamable HTTP connections
128+
async def handle_streamable_http(scope, receive, send):
129+
if http_transport.mcp_session_id in server_instances:
130+
logger.debug("Session already exists, handling request directly")
131+
await http_transport.handle_request(scope, receive, send)
132+
else:
133+
# Start new server instance for this session
134+
async with http_transport.connect() as streams:
135+
read_stream, write_stream = streams
136+
137+
async def run_server():
138+
await app.run(
139+
read_stream, write_stream, app.create_initialization_options()
140+
)
141+
142+
if not task_group:
143+
raise RuntimeError("Task group is not initialized")
144+
145+
task_group.start_soon(run_server)
146+
147+
# For initialization requests, store the server reference
148+
if http_transport.mcp_session_id:
149+
server_instances[http_transport.mcp_session_id] = True
150+
151+
# Handle the HTTP request and return the response
152+
await http_transport.handle_request(scope, receive, send)
153+
154+
# Create an ASGI application using the transport
155+
starlette_app = Starlette(
156+
debug=True,
157+
routes=[
158+
Mount("/mcp", app=handle_streamable_http),
159+
],
160+
lifespan=lifespan,
161+
)
162+
163+
import uvicorn
164+
165+
uvicorn.run(starlette_app, host="0.0.0.0", port=port)
166+
167+
return 0
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
[project]
2+
name = "mcp-simple-streamablehttp"
3+
version = "0.1.0"
4+
description = "A simple MCP server exposing a website fetching tool with StreamableHttp transport"
5+
readme = "README.md"
6+
requires-python = ">=3.10"
7+
authors = [{ name = "Anthropic, PBC." }]
8+
maintainers = [
9+
{ name = "David Soria Parra", email = "davidsp@anthropic.com" },
10+
{ name = "Justin Spahr-Summers", email = "justin@anthropic.com" },
11+
]
12+
keywords = ["mcp", "llm", "automation", "web", "fetch", "http", "streamable"]
13+
license = { text = "MIT" }
14+
classifiers = [
15+
"Development Status :: 4 - Beta",
16+
"Intended Audience :: Developers",
17+
"License :: OSI Approved :: MIT License",
18+
"Programming Language :: Python :: 3",
19+
"Programming Language :: Python :: 3.10",
20+
]
21+
dependencies = ["anyio>=4.5", "click>=8.1.0", "httpx>=0.27", "mcp", "starlette", "uvicorn"]
22+
23+
[project.scripts]
24+
mcp-simple-streamablehttp = "mcp_simple_streamablehttp.server:main"
25+
26+
[build-system]
27+
requires = ["hatchling"]
28+
build-backend = "hatchling.build"
29+
30+
[tool.hatch.build.targets.wheel]
31+
packages = ["mcp_simple_streamablehttp"]
32+
33+
[tool.pyright]
34+
include = ["mcp_simple_streamablehttp"]
35+
venvPath = "."
36+
venv = ".venv"
37+
38+
[tool.ruff.lint]
39+
select = ["E", "F", "I"]
40+
ignore = []
41+
42+
[tool.ruff]
43+
line-length = 88
44+
target-version = "py310"
45+
46+
[tool.uv]
47+
dev-dependencies = ["pyright>=1.1.378", "pytest>=8.3.3", "ruff>=0.6.9"]

src/mcp/server/session.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,11 @@ async def _received_notification(
179179
)
180180

181181
async def send_log_message(
182-
self, level: types.LoggingLevel, data: Any, logger: str | None = None
182+
self,
183+
level: types.LoggingLevel,
184+
data: Any,
185+
logger: str | None = None,
186+
related_request_id: types.RequestId | None = None,
183187
) -> None:
184188
"""Send a log message notification."""
185189
await self.send_notification(
@@ -192,7 +196,8 @@ async def send_log_message(
192196
logger=logger,
193197
),
194198
)
195-
)
199+
),
200+
related_request_id,
196201
)
197202

198203
async def send_resource_updated(self, uri: AnyUrl) -> None:
@@ -261,7 +266,11 @@ async def send_ping(self) -> types.EmptyResult:
261266
)
262267

263268
async def send_progress_notification(
264-
self, progress_token: str | int, progress: float, total: float | None = None
269+
self,
270+
progress_token: str | int,
271+
progress: float,
272+
total: float | None = None,
273+
related_request_id: str | None = None,
265274
) -> None:
266275
"""Send a progress notification."""
267276
await self.send_notification(
@@ -274,7 +283,8 @@ async def send_progress_notification(
274283
total=total,
275284
),
276285
)
277-
)
286+
),
287+
related_request_id,
278288
)
279289

280290
async def send_resource_list_changed(self) -> None:

0 commit comments

Comments
 (0)