Skip to content

Commit bb24881

Browse files
committed
testing
1 parent 8d6a20d commit bb24881

File tree

10 files changed

+664
-3
lines changed

10 files changed

+664
-3
lines changed

examples/servers/simple-prompt/mcp_simple_prompt/server.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,11 @@ async def get_prompt(
9292
from starlette.applications import Starlette
9393
from starlette.routing import Mount, Route
9494

95-
sse = SseServerTransport("/messages/")
95+
from mcp.server.message_queue.redis import RedisMessageDispatch
96+
97+
message_dispatch = RedisMessageDispatch("redis://localhost:6379/0")
98+
99+
sse = SseServerTransport("/messages/", message_dispatch=message_dispatch)
96100

97101
async def handle_sse(request):
98102
async with sse.connect_sse(

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ dependencies = [
3030
"sse-starlette>=1.6.1",
3131
"pydantic-settings>=2.5.2",
3232
"uvicorn>=0.23.1; sys_platform != 'emscripten'",
33+
"fakeredis==2.28.1",
3334
]
3435

3536
[project.optional-dependencies]

src/mcp/server/fastmcp/server.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from __future__ import annotations as _annotations
44

5+
from email import message
56
import inspect
67
import json
78
import re
@@ -507,14 +508,22 @@ async def handle_sse(request: Request) -> None:
507508
streams[0],
508509
streams[1],
509510
self._mcp_server.create_initialization_options(),
510-
)
511+
)
512+
513+
@asynccontextmanager
514+
async def lifespan(app: Starlette):
515+
try:
516+
yield
517+
finally:
518+
await message_dispatch.close()
511519

512520
return Starlette(
513521
debug=self.settings.debug,
514522
routes=[
515523
Route(self.settings.sse_path, endpoint=handle_sse),
516524
Mount(self.settings.message_path, app=sse.handle_post_message),
517525
],
526+
lifespan=lifespan,
518527
)
519528

520529
async def list_prompts(self) -> list[MCPPrompt]:

src/mcp/server/message_queue/base.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ async def session_exists(self, session_id: UUID) -> bool:
5656
"""
5757
...
5858

59+
async def close(self) -> None:
60+
"""Close the message dispatch."""
61+
...
62+
5963

6064
class InMemoryMessageDispatch:
6165
"""Default in-memory implementation of the MessageDispatch interface.
@@ -106,3 +110,7 @@ async def subscribe(self, session_id: UUID, callback: MessageCallback):
106110
async def session_exists(self, session_id: UUID) -> bool:
107111
"""Check if a session exists."""
108112
return session_id in self._callbacks
113+
114+
async def close(self) -> None:
115+
"""Close the message dispatch."""
116+
pass

src/mcp/server/message_queue/redis.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ def __init__(
4848
self._limiter = CapacityLimiter(1)
4949
logger.debug(f"Redis message dispatch initialized: {redis_url}")
5050

51+
async def close(self):
52+
await self._pubsub.aclose() # type: ignore
53+
await self._redis.aclose() # type: ignore
54+
5155
def _session_channel(self, session_id: UUID) -> str:
5256
"""Get the Redis channel for a session."""
5357
return f"{self._prefix}session:{session_id.hex}"

src/mcp/server/sse.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ async def handle_sse(request):
4949
from mcp.server.message_queue import InMemoryMessageDispatch, MessageDispatch
5050

5151
logger = logging.getLogger(__name__)
52+
logging.basicConfig(level=logging.DEBUG)
5253

5354

5455
class SseServerTransport:
@@ -184,4 +185,4 @@ async def handle_post_message(
184185
logger.debug(f"Publishing message for session {session_id}: {message}")
185186
response = Response("Accepted", status_code=202)
186187
await response(scope, receive, send)
187-
await self._message_dispatch.publish_message(session_id, message)
188+
await self._message_dispatch.publish_message(session_id, message)
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# Message queue tests module

0 commit comments

Comments
 (0)