From 145f893108cce6fb7a67199f91501a725aa33efc Mon Sep 17 00:00:00 2001 From: cliffhall Date: Thu, 22 May 2025 11:41:16 -0400 Subject: [PATCH 1/4] Allow multiple connections to the `everything` server. For both sse and streamableHttp, a server instance needs to be created for each transport. Otherwise, when a new client connects and its new transport is connected to the single server, the previous transport is overwritten in the server instance and can no longer communicate. * In sse.ts - remove global server, cleanup, and transport vars - add transports map - in sse GET handler, - check for sessionId, there shouldn't be one, so comment "Reconnecting?" and do nothing if present - if sessionId not present - create new server and transport instance - connect server to transport - add transport to transports map - in server.onclose, delete the transport from the transports map and call cleanup - in /message POST handler - get the sessionId from the request - get the transport from the map by sessionId - handle the message if the transport was found * In streamableHttp.ts - remove the global server and cleanup vars - change transports var to Map - in /mcp POST handler - when creating a new session - create a server instance - in server.onclose, delete the transport from the transports map and call cleanup - remove the calls to cleanup and server.close in the SIGINT handler, because the transport is closed and its onclose handler closes the server. --- src/everything/sse.ts | 52 +++++++++++++++++++------------- src/everything/streamableHttp.ts | 40 ++++++++++++------------ 2 files changed, 52 insertions(+), 40 deletions(-) diff --git a/src/everything/sse.ts b/src/everything/sse.ts index 01794cddd2..a3eddaa500 100644 --- a/src/everything/sse.ts +++ b/src/everything/sse.ts @@ -6,36 +6,46 @@ console.error('Starting SSE server...'); const app = express(); -const { server, cleanup } = createServer(); - -let transport: SSEServerTransport; +const transports: Map = new Map(); app.get("/sse", async (req, res) => { - console.error("Received connection"); - transport = new SSEServerTransport("/message", res); - await server.connect(transport); - - server.onclose = async () => { - await cleanup(); - await server.close(); - }; + let transport: SSEServerTransport; + const { server, cleanup } = createServer(); + + if (req?.query?.sessionId) { + const sessionId = (req?.query?.sessionId as string) || "none"; + transport = transports.get(sessionId) as SSEServerTransport; + console.error("Client Reconnecting? ", transport.sessionId); + } else { + // Create and store transport for new session + transport = new SSEServerTransport("/message", res); + transports.set(transport.sessionId, transport); + + // Connect server to transport + await server.connect(transport); + console.error("Client Connected: ", transport.sessionId); + + // Handle close of connection + server.onclose = async () => { + console.error("Client Disconnected: ", transport.sessionId); + transports.delete(transport.sessionId); + await cleanup(); + }; + + } }); app.post("/message", async (req, res) => { - console.error("Received message"); - - await transport.handlePostMessage(req, res); -}); - -process.on("SIGINT", async () => { - await cleanup(); - await server.close(); - process.exit(0); + const sessionId = (req?.query?.sessionId as string) || "none"; + const transport = transports.get(sessionId); + if (transport) { + console.error("Client Message from", sessionId); + await transport.handlePostMessage(req, res); + } }); const PORT = process.env.PORT || 3001; app.listen(PORT, () => { console.error(`Server is running on port ${PORT}`); }); - diff --git a/src/everything/streamableHttp.ts b/src/everything/streamableHttp.ts index 9ea3a8fc14..ccd3ad8a69 100644 --- a/src/everything/streamableHttp.ts +++ b/src/everything/streamableHttp.ts @@ -8,9 +8,7 @@ console.error('Starting Streamable HTTP server...'); const app = express(); -const { server, cleanup } = createServer(); - -const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {}; +const transports: Map = new Map(); app.post('/mcp', async (req: Request, res: Response) => { console.error('Received MCP POST request'); @@ -19,10 +17,13 @@ app.post('/mcp', async (req: Request, res: Response) => { const sessionId = req.headers['mcp-session-id'] as string | undefined; let transport: StreamableHTTPServerTransport; - if (sessionId && transports[sessionId]) { + if (sessionId && transports.has(sessionId)) { // Reuse existing transport - transport = transports[sessionId]; + transport = transports.get(sessionId)!; } else if (!sessionId) { + + const { server, cleanup } = createServer(); + // New initialization request const eventStore = new InMemoryEventStore(); transport = new StreamableHTTPServerTransport({ @@ -32,16 +33,18 @@ app.post('/mcp', async (req: Request, res: Response) => { // Store the transport by session ID when session is initialized // This avoids race conditions where requests might come in before the session is stored console.error(`Session initialized with ID: ${sessionId}`); - transports[sessionId] = transport; + transports.set(sessionId, transport); } }); + // Set up onclose handler to clean up transport when closed - transport.onclose = () => { + server.onclose = async () => { const sid = transport.sessionId; - if (sid && transports[sid]) { + if (sid && transports.has(sid)) { console.error(`Transport closed for session ${sid}, removing from transports map`); - delete transports[sid]; + transports.delete(sid); + await cleanup(); } }; @@ -87,7 +90,7 @@ app.post('/mcp', async (req: Request, res: Response) => { app.get('/mcp', async (req: Request, res: Response) => { console.error('Received MCP GET request'); const sessionId = req.headers['mcp-session-id'] as string | undefined; - if (!sessionId || !transports[sessionId]) { + if (!sessionId || !transports.has(sessionId)) { res.status(400).json({ jsonrpc: '2.0', error: { @@ -107,14 +110,14 @@ app.get('/mcp', async (req: Request, res: Response) => { console.error(`Establishing new SSE stream for session ${sessionId}`); } - const transport = transports[sessionId]; - await transport.handleRequest(req, res); + const transport = transports.get(sessionId); + await transport!.handleRequest(req, res); }); // Handle DELETE requests for session termination (according to MCP spec) app.delete('/mcp', async (req: Request, res: Response) => { const sessionId = req.headers['mcp-session-id'] as string | undefined; - if (!sessionId || !transports[sessionId]) { + if (!sessionId || !transports.has(sessionId)) { res.status(400).json({ jsonrpc: '2.0', error: { @@ -129,8 +132,8 @@ app.delete('/mcp', async (req: Request, res: Response) => { console.error(`Received session termination request for session ${sessionId}`); try { - const transport = transports[sessionId]; - await transport.handleRequest(req, res); + const transport = transports.get(sessionId); + await transport!.handleRequest(req, res); } catch (error) { console.error('Error handling session termination:', error); if (!res.headersSent) { @@ -161,14 +164,13 @@ process.on('SIGINT', async () => { for (const sessionId in transports) { try { console.error(`Closing transport for session ${sessionId}`); - await transports[sessionId].close(); - delete transports[sessionId]; + await transports.get(sessionId)!.close(); + transports.delete(sessionId); } catch (error) { console.error(`Error closing transport for session ${sessionId}:`, error); } } - await cleanup(); - await server.close(); + console.error('Server shutdown complete'); process.exit(0); }); From 9ffacb78abbb607e9fb8903e74c13e532edd8615 Mon Sep 17 00:00:00 2001 From: cliffhall Date: Wed, 28 May 2025 11:13:54 -0400 Subject: [PATCH 2/4] Be more verbose about unexpected GET /sse calls from already connected clients. --- src/everything/sse.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/everything/sse.ts b/src/everything/sse.ts index a3eddaa500..671c2c171d 100644 --- a/src/everything/sse.ts +++ b/src/everything/sse.ts @@ -15,7 +15,7 @@ app.get("/sse", async (req, res) => { if (req?.query?.sessionId) { const sessionId = (req?.query?.sessionId as string) || "none"; transport = transports.get(sessionId) as SSEServerTransport; - console.error("Client Reconnecting? ", transport.sessionId); + console.error("Client Reconnecting? This shouldn't happen; when client has a sessionId, GET /sse should not be called again.", transport.sessionId); } else { // Create and store transport for new session transport = new SSEServerTransport("/message", res); From 2da9f339694c5098ddc4355ac0f9410a46a14344 Mon Sep 17 00:00:00 2001 From: cliffhall Date: Wed, 28 May 2025 16:31:21 -0400 Subject: [PATCH 3/4] Remove or clause for sessionId where "none" is offered as an alternative. In the first case (line 16) we already know that req.query.sessionId is set to something. I n the second (line 40), it doesn't matter because if it doesn't map to a transport no further action is taken. --- src/everything/sse.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/everything/sse.ts b/src/everything/sse.ts index 671c2c171d..3ee623c886 100644 --- a/src/everything/sse.ts +++ b/src/everything/sse.ts @@ -13,7 +13,7 @@ app.get("/sse", async (req, res) => { const { server, cleanup } = createServer(); if (req?.query?.sessionId) { - const sessionId = (req?.query?.sessionId as string) || "none"; + const sessionId = (req?.query?.sessionId as string); transport = transports.get(sessionId) as SSEServerTransport; console.error("Client Reconnecting? This shouldn't happen; when client has a sessionId, GET /sse should not be called again.", transport.sessionId); } else { @@ -37,7 +37,7 @@ app.get("/sse", async (req, res) => { }); app.post("/message", async (req, res) => { - const sessionId = (req?.query?.sessionId as string) || "none"; + const sessionId = (req?.query?.sessionId as string); const transport = transports.get(sessionId); if (transport) { console.error("Client Message from", sessionId); From 3adf59409c1b8fda19cca36b96774e3cedfb909b Mon Sep 17 00:00:00 2001 From: cliffhall Date: Wed, 28 May 2025 17:29:42 -0400 Subject: [PATCH 4/4] In sse.ts, /message endpoint, if transport isn't found for the given sessionId, output a "No transport found for sessionId" message. --- src/everything/sse.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/everything/sse.ts b/src/everything/sse.ts index 3ee623c886..a657af75be 100644 --- a/src/everything/sse.ts +++ b/src/everything/sse.ts @@ -42,6 +42,8 @@ app.post("/message", async (req, res) => { if (transport) { console.error("Client Message from", sessionId); await transport.handlePostMessage(req, res); + } else { + console.error(`No transport found for sessionId ${sessionId}`) } });