From 07af15945dd722744d27e6da12c4c1c1e2fa0182 Mon Sep 17 00:00:00 2001 From: Shiv Deepak Muddada Date: Fri, 18 Apr 2025 18:25:54 -0700 Subject: [PATCH 1/6] add streamableHttp server support for everything server --- src/everything/package.json | 5 +- src/everything/streamableHttp.ts | 151 +++++++++++++++++++++++++++++++ 2 files changed, 154 insertions(+), 2 deletions(-) create mode 100644 src/everything/streamableHttp.ts diff --git a/src/everything/package.json b/src/everything/package.json index bfcbfae4c9..ffe6b7cd68 100644 --- a/src/everything/package.json +++ b/src/everything/package.json @@ -18,10 +18,11 @@ "prepare": "npm run build", "watch": "tsc --watch", "start": "node dist/index.js", - "start:sse": "node dist/sse.js" + "start:sse": "node dist/sse.js", + "start:streamableHttp": "node dist/streamableHttp.js" }, "dependencies": { - "@modelcontextprotocol/sdk": "^1.9.0", + "@modelcontextprotocol/sdk": "^1.10.1", "express": "^4.21.1", "zod": "^3.23.8", "zod-to-json-schema": "^3.23.5" diff --git a/src/everything/streamableHttp.ts b/src/everything/streamableHttp.ts new file mode 100644 index 0000000000..4267b2f445 --- /dev/null +++ b/src/everything/streamableHttp.ts @@ -0,0 +1,151 @@ +import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; +import { isInitializeRequest } from '@modelcontextprotocol/sdk/types.js'; +import { InMemoryEventStore } from '@modelcontextprotocol/sdk/examples/shared/inMemoryEventStore.js'; +import express, { Request, Response } from "express"; +import { createServer } from "./everything.js"; +import { randomUUID } from 'node:crypto'; + +const app = express(); + +app.use(express.json()); + +const { server, cleanup } = createServer(); + +const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {}; + +app.post('/mcp', async (req: Request, res: Response) => { + console.log('Received MCP request:', req.body); + try { + // Check for existing session ID + const sessionId = req.headers['mcp-session-id'] as string | undefined; + let transport: StreamableHTTPServerTransport; + + if (sessionId && transports[sessionId]) { + // Reuse existing transport + transport = transports[sessionId]; + } else if (!sessionId && isInitializeRequest(req.body)) { + // New initialization request + const eventStore = new InMemoryEventStore(); + transport = new StreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID(), + eventStore, // Enable resumability + onsessioninitialized: (sessionId) => { + // Store the transport by session ID when session is initialized + // This avoids race conditions where requests might come in before the session is stored + console.log(`Session initialized with ID: ${sessionId}`); + transports[sessionId] = transport; + } + }); + + // Set up onclose handler to clean up transport when closed + transport.onclose = () => { + const sid = transport.sessionId; + if (sid && transports[sid]) { + console.log(`Transport closed for session ${sid}, removing from transports map`); + delete transports[sid]; + } + }; + + // Connect the transport to the MCP server BEFORE handling the request + // so responses can flow back through the same transport + await server.connect(transport); + + await transport.handleRequest(req, res, req.body); + return; // Already handled + } else { + // Invalid request - no session ID or not initialization request + res.status(400).json({ + jsonrpc: '2.0', + error: { + code: -32000, + message: 'Bad Request: No valid session ID provided', + }, + id: null, + }); + return; + } + + // Handle the request with existing transport - no need to reconnect + // The existing transport is already connected to the server + await transport.handleRequest(req, res, req.body); + } catch (error) { + console.error('Error handling MCP request:', error); + if (!res.headersSent) { + res.status(500).json({ + jsonrpc: '2.0', + error: { + code: -32603, + message: 'Internal server error', + }, + id: null, + }); + } + } +}); + +// Handle GET requests for SSE streams (using built-in support from StreamableHTTP) +app.get('/mcp', async (req: Request, res: Response) => { + const sessionId = req.headers['mcp-session-id'] as string | undefined; + if (!sessionId || !transports[sessionId]) { + res.status(400).send('Invalid or missing session ID'); + return; + } + + // Check for Last-Event-ID header for resumability + const lastEventId = req.headers['last-event-id'] as string | undefined; + if (lastEventId) { + console.log(`Client reconnecting with Last-Event-ID: ${lastEventId}`); + } else { + console.log(`Establishing new SSE stream for session ${sessionId}`); + } + + const transport = transports[sessionId]; + await transport.handleRequest(req, res); +}); + +// Handle DELETE requests for session termination (according to MCP spec) +app.delete('/mcp', async (req: Request, res: Response) => { + const sessionId = req.headers['mcp-session-id'] as string | undefined; + if (!sessionId || !transports[sessionId]) { + res.status(400).send('Invalid or missing session ID'); + return; + } + + console.log(`Received session termination request for session ${sessionId}`); + + try { + const transport = transports[sessionId]; + await transport.handleRequest(req, res); + } catch (error) { + console.error('Error handling session termination:', error); + if (!res.headersSent) { + res.status(500).send('Error processing session termination'); + } + } +}); + +// Start the server +const PORT = process.env.PORT || 3001; +app.listen(PORT, () => { + console.log(`MCP Streamable HTTP Server listening on port ${PORT}`); +}); + +// Handle server shutdown +process.on('SIGINT', async () => { + console.log('Shutting down server...'); + + // Close all active transports to properly clean up resources + for (const sessionId in transports) { + try { + console.log(`Closing transport for session ${sessionId}`); + await transports[sessionId].close(); + delete transports[sessionId]; + } catch (error) { + console.error(`Error closing transport for session ${sessionId}:`, error); + } + } + await cleanup(); + await server.close(); + console.log('Server shutdown complete'); + process.exit(0); +}); From ace5c2a8dd2ffd85bf53a070e7f0767f949257fb Mon Sep 17 00:00:00 2001 From: Shiv Deepak Muddada Date: Fri, 18 Apr 2025 23:56:50 -0700 Subject: [PATCH 2/6] update docs --- src/everything/README.md | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/everything/README.md b/src/everything/README.md index 236ac59660..f356d68796 100644 --- a/src/everything/README.md +++ b/src/everything/README.md @@ -126,7 +126,7 @@ The server sends random-leveled log messages every 15 seconds, e.g.: } ``` -## Usage with Claude Desktop +## Usage with Claude Desktop (uses [stdio Transport](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#stdio)) Add to your `claude_desktop_config.json`: @@ -171,4 +171,18 @@ Optionally, you can add it to a file called `.vscode/mcp.json` in your workspace } } } +## Run with [HTTP+SSE Transport](https://modelcontextprotocol.io/specification/2024-11-05/basic/transports#http-with-sse) (deprecated as of [2025-03-26](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports)) + +```shell +cd src/everything +npm install +npm run start:sse +``` + +## Run with [Streamable HTTP Transport](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http) + +```shell +cd src/everything +npm install +npm run start:streamableHttp ``` From 03e9a7be0dd5abfb023b4ea9e9e39a493b732a11 Mon Sep 17 00:00:00 2001 From: Shiv Deepak Muddada Date: Sat, 19 Apr 2025 13:04:29 -0700 Subject: [PATCH 3/6] ref: cleanup --- src/everything/README.md | 2 ++ src/everything/streamableHttp.ts | 31 ++++++++++++++++++++++++++----- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/src/everything/README.md b/src/everything/README.md index f356d68796..4d51de51a3 100644 --- a/src/everything/README.md +++ b/src/everything/README.md @@ -171,6 +171,8 @@ Optionally, you can add it to a file called `.vscode/mcp.json` in your workspace } } } +``` + ## Run with [HTTP+SSE Transport](https://modelcontextprotocol.io/specification/2024-11-05/basic/transports#http-with-sse) (deprecated as of [2025-03-26](https://modelcontextprotocol.io/specification/2025-03-26/basic/transports)) ```shell diff --git a/src/everything/streamableHttp.ts b/src/everything/streamableHttp.ts index 4267b2f445..69807d6e61 100644 --- a/src/everything/streamableHttp.ts +++ b/src/everything/streamableHttp.ts @@ -50,7 +50,7 @@ app.post('/mcp', async (req: Request, res: Response) => { // so responses can flow back through the same transport await server.connect(transport); - await transport.handleRequest(req, res, req.body); + await transport.handleRequest(req, res); return; // Already handled } else { // Invalid request - no session ID or not initialization request @@ -67,7 +67,7 @@ app.post('/mcp', async (req: Request, res: Response) => { // Handle the request with existing transport - no need to reconnect // The existing transport is already connected to the server - await transport.handleRequest(req, res, req.body); + await transport.handleRequest(req, res); } catch (error) { console.error('Error handling MCP request:', error); if (!res.headersSent) { @@ -87,7 +87,14 @@ app.post('/mcp', async (req: Request, res: Response) => { app.get('/mcp', async (req: Request, res: Response) => { const sessionId = req.headers['mcp-session-id'] as string | undefined; if (!sessionId || !transports[sessionId]) { - res.status(400).send('Invalid or missing session ID'); + res.status(400).json({ + jsonrpc: '2.0', + error: { + code: -32000, + message: 'Bad Request: No valid session ID provided', + }, + id: null, + }); return; } @@ -107,7 +114,14 @@ app.get('/mcp', async (req: Request, res: Response) => { app.delete('/mcp', async (req: Request, res: Response) => { const sessionId = req.headers['mcp-session-id'] as string | undefined; if (!sessionId || !transports[sessionId]) { - res.status(400).send('Invalid or missing session ID'); + res.status(400).json({ + jsonrpc: '2.0', + error: { + code: -32000, + message: 'Bad Request: No valid session ID provided', + }, + id: null, + }); return; } @@ -119,7 +133,14 @@ app.delete('/mcp', async (req: Request, res: Response) => { } catch (error) { console.error('Error handling session termination:', error); if (!res.headersSent) { - res.status(500).send('Error processing session termination'); + res.status(500).json({ + jsonrpc: '2.0', + error: { + code: -32603, + message: 'Error handling session termination', + }, + id: null, + }); } } }); From d1d79444f7bf791d153a7587dbb6924b5851c49b Mon Sep 17 00:00:00 2001 From: Shiv Deepak Muddada Date: Sat, 19 Apr 2025 16:21:41 -0700 Subject: [PATCH 4/6] fix: passing body to handleRequest, and optionally adding a response id if it exists --- src/everything/streamableHttp.ts | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/everything/streamableHttp.ts b/src/everything/streamableHttp.ts index 69807d6e61..0c4bbd2d06 100644 --- a/src/everything/streamableHttp.ts +++ b/src/everything/streamableHttp.ts @@ -50,7 +50,7 @@ app.post('/mcp', async (req: Request, res: Response) => { // so responses can flow back through the same transport await server.connect(transport); - await transport.handleRequest(req, res); + await transport.handleRequest(req, res, req.body); return; // Already handled } else { // Invalid request - no session ID or not initialization request @@ -60,14 +60,14 @@ app.post('/mcp', async (req: Request, res: Response) => { code: -32000, message: 'Bad Request: No valid session ID provided', }, - id: null, + id: req?.body?.id, }); return; } // Handle the request with existing transport - no need to reconnect // The existing transport is already connected to the server - await transport.handleRequest(req, res); + await transport.handleRequest(req, res, req.body); } catch (error) { console.error('Error handling MCP request:', error); if (!res.headersSent) { @@ -77,8 +77,9 @@ app.post('/mcp', async (req: Request, res: Response) => { code: -32603, message: 'Internal server error', }, - id: null, + id: req?.body?.id, }); + return; } } }); @@ -93,7 +94,7 @@ app.get('/mcp', async (req: Request, res: Response) => { code: -32000, message: 'Bad Request: No valid session ID provided', }, - id: null, + id: req?.body?.id, }); return; } @@ -120,7 +121,7 @@ app.delete('/mcp', async (req: Request, res: Response) => { code: -32000, message: 'Bad Request: No valid session ID provided', }, - id: null, + id: req?.body?.id, }); return; } @@ -139,8 +140,9 @@ app.delete('/mcp', async (req: Request, res: Response) => { code: -32603, message: 'Error handling session termination', }, - id: null, + id: req?.body?.id, }); + return; } } }); From 7e602b09763d48837d3d48e717b69fe9a5095bd6 Mon Sep 17 00:00:00 2001 From: Shiv Deepak Muddada Date: Sat, 19 Apr 2025 16:43:15 -0700 Subject: [PATCH 5/6] update package.lock --- package-lock.json | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/package-lock.json b/package-lock.json index aa8137c3bc..23e61d7022 100644 --- a/package-lock.json +++ b/package-lock.json @@ -5435,7 +5435,7 @@ "version": "0.6.2", "license": "MIT", "dependencies": { - "@modelcontextprotocol/sdk": "^1.9.0", + "@modelcontextprotocol/sdk": "^1.10.1", "express": "^4.21.1", "zod": "^3.23.8", "zod-to-json-schema": "^3.23.5" @@ -5450,9 +5450,9 @@ } }, "src/everything/node_modules/@modelcontextprotocol/sdk": { - "version": "1.9.0", - "resolved": "https://registry.npmjs.org/@modelcontextprotocol/sdk/-/sdk-1.9.0.tgz", - "integrity": "sha512-Jq2EUCQpe0iyO5FGpzVYDNFR6oR53AIrwph9yWl7uSc7IWUMsrmpmSaTGra5hQNunXpM+9oit85p924jWuHzUA==", + "version": "1.10.1", + "resolved": "https://registry.npmjs.org/@modelcontextprotocol/sdk/-/sdk-1.10.1.tgz", + "integrity": "sha512-xNYdFdkJqEfIaTVP1gPKoEvluACHZsHZegIoICX8DM1o6Qf3G5u2BQJHmgd0n4YgRPqqK/u1ujQvrgAxxSJT9w==", "license": "MIT", "dependencies": { "content-type": "^1.0.5", From e70bcd317b4dd00181b4978816c6e83df28e7a34 Mon Sep 17 00:00:00 2001 From: Shiv Deepak Muddada Date: Tue, 22 Apr 2025 19:14:12 -0700 Subject: [PATCH 6/6] remove json middleware from everything streamable http server --- src/everything/streamableHttp.ts | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/everything/streamableHttp.ts b/src/everything/streamableHttp.ts index 0c4bbd2d06..3a87bc8332 100644 --- a/src/everything/streamableHttp.ts +++ b/src/everything/streamableHttp.ts @@ -1,5 +1,4 @@ import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; -import { isInitializeRequest } from '@modelcontextprotocol/sdk/types.js'; import { InMemoryEventStore } from '@modelcontextprotocol/sdk/examples/shared/inMemoryEventStore.js'; import express, { Request, Response } from "express"; import { createServer } from "./everything.js"; @@ -7,14 +6,12 @@ import { randomUUID } from 'node:crypto'; const app = express(); -app.use(express.json()); - const { server, cleanup } = createServer(); const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {}; app.post('/mcp', async (req: Request, res: Response) => { - console.log('Received MCP request:', req.body); + console.log('Received MCP POST request'); try { // Check for existing session ID const sessionId = req.headers['mcp-session-id'] as string | undefined; @@ -23,7 +20,7 @@ app.post('/mcp', async (req: Request, res: Response) => { if (sessionId && transports[sessionId]) { // Reuse existing transport transport = transports[sessionId]; - } else if (!sessionId && isInitializeRequest(req.body)) { + } else if (!sessionId) { // New initialization request const eventStore = new InMemoryEventStore(); transport = new StreamableHTTPServerTransport({ @@ -50,7 +47,7 @@ app.post('/mcp', async (req: Request, res: Response) => { // so responses can flow back through the same transport await server.connect(transport); - await transport.handleRequest(req, res, req.body); + await transport.handleRequest(req, res); return; // Already handled } else { // Invalid request - no session ID or not initialization request @@ -67,7 +64,7 @@ app.post('/mcp', async (req: Request, res: Response) => { // Handle the request with existing transport - no need to reconnect // The existing transport is already connected to the server - await transport.handleRequest(req, res, req.body); + await transport.handleRequest(req, res); } catch (error) { console.error('Error handling MCP request:', error); if (!res.headersSent) { @@ -86,6 +83,7 @@ app.post('/mcp', async (req: Request, res: Response) => { // Handle GET requests for SSE streams (using built-in support from StreamableHTTP) app.get('/mcp', async (req: Request, res: Response) => { + console.log('Received MCP GET request'); const sessionId = req.headers['mcp-session-id'] as string | undefined; if (!sessionId || !transports[sessionId]) { res.status(400).json({