diff --git a/src/everything/sse.ts b/src/everything/sse.ts index 01794cddd2..a657af75be 100644 --- a/src/everything/sse.ts +++ b/src/everything/sse.ts @@ -6,36 +6,48 @@ console.error('Starting SSE server...'); const app = express(); -const { server, cleanup } = createServer(); - -let transport: SSEServerTransport; +const transports: Map = new Map(); app.get("/sse", async (req, res) => { - console.error("Received connection"); - transport = new SSEServerTransport("/message", res); - await server.connect(transport); - - server.onclose = async () => { - await cleanup(); - await server.close(); - }; + let transport: SSEServerTransport; + const { server, cleanup } = createServer(); + + if (req?.query?.sessionId) { + const sessionId = (req?.query?.sessionId as string); + transport = transports.get(sessionId) as SSEServerTransport; + console.error("Client Reconnecting? This shouldn't happen; when client has a sessionId, GET /sse should not be called again.", transport.sessionId); + } else { + // Create and store transport for new session + transport = new SSEServerTransport("/message", res); + transports.set(transport.sessionId, transport); + + // Connect server to transport + await server.connect(transport); + console.error("Client Connected: ", transport.sessionId); + + // Handle close of connection + server.onclose = async () => { + console.error("Client Disconnected: ", transport.sessionId); + transports.delete(transport.sessionId); + await cleanup(); + }; + + } }); app.post("/message", async (req, res) => { - console.error("Received message"); - - await transport.handlePostMessage(req, res); -}); - -process.on("SIGINT", async () => { - await cleanup(); - await server.close(); - process.exit(0); + const sessionId = (req?.query?.sessionId as string); + const transport = transports.get(sessionId); + if (transport) { + console.error("Client Message from", sessionId); + await transport.handlePostMessage(req, res); + } else { + console.error(`No transport found for sessionId ${sessionId}`) + } }); const PORT = process.env.PORT || 3001; app.listen(PORT, () => { console.error(`Server is running on port ${PORT}`); }); - diff --git a/src/everything/streamableHttp.ts b/src/everything/streamableHttp.ts index 9ea3a8fc14..ccd3ad8a69 100644 --- a/src/everything/streamableHttp.ts +++ b/src/everything/streamableHttp.ts @@ -8,9 +8,7 @@ console.error('Starting Streamable HTTP server...'); const app = express(); -const { server, cleanup } = createServer(); - -const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {}; +const transports: Map = new Map(); app.post('/mcp', async (req: Request, res: Response) => { console.error('Received MCP POST request'); @@ -19,10 +17,13 @@ app.post('/mcp', async (req: Request, res: Response) => { const sessionId = req.headers['mcp-session-id'] as string | undefined; let transport: StreamableHTTPServerTransport; - if (sessionId && transports[sessionId]) { + if (sessionId && transports.has(sessionId)) { // Reuse existing transport - transport = transports[sessionId]; + transport = transports.get(sessionId)!; } else if (!sessionId) { + + const { server, cleanup } = createServer(); + // New initialization request const eventStore = new InMemoryEventStore(); transport = new StreamableHTTPServerTransport({ @@ -32,16 +33,18 @@ app.post('/mcp', async (req: Request, res: Response) => { // Store the transport by session ID when session is initialized // This avoids race conditions where requests might come in before the session is stored console.error(`Session initialized with ID: ${sessionId}`); - transports[sessionId] = transport; + transports.set(sessionId, transport); } }); + // Set up onclose handler to clean up transport when closed - transport.onclose = () => { + server.onclose = async () => { const sid = transport.sessionId; - if (sid && transports[sid]) { + if (sid && transports.has(sid)) { console.error(`Transport closed for session ${sid}, removing from transports map`); - delete transports[sid]; + transports.delete(sid); + await cleanup(); } }; @@ -87,7 +90,7 @@ app.post('/mcp', async (req: Request, res: Response) => { app.get('/mcp', async (req: Request, res: Response) => { console.error('Received MCP GET request'); const sessionId = req.headers['mcp-session-id'] as string | undefined; - if (!sessionId || !transports[sessionId]) { + if (!sessionId || !transports.has(sessionId)) { res.status(400).json({ jsonrpc: '2.0', error: { @@ -107,14 +110,14 @@ app.get('/mcp', async (req: Request, res: Response) => { console.error(`Establishing new SSE stream for session ${sessionId}`); } - const transport = transports[sessionId]; - await transport.handleRequest(req, res); + const transport = transports.get(sessionId); + await transport!.handleRequest(req, res); }); // Handle DELETE requests for session termination (according to MCP spec) app.delete('/mcp', async (req: Request, res: Response) => { const sessionId = req.headers['mcp-session-id'] as string | undefined; - if (!sessionId || !transports[sessionId]) { + if (!sessionId || !transports.has(sessionId)) { res.status(400).json({ jsonrpc: '2.0', error: { @@ -129,8 +132,8 @@ app.delete('/mcp', async (req: Request, res: Response) => { console.error(`Received session termination request for session ${sessionId}`); try { - const transport = transports[sessionId]; - await transport.handleRequest(req, res); + const transport = transports.get(sessionId); + await transport!.handleRequest(req, res); } catch (error) { console.error('Error handling session termination:', error); if (!res.headersSent) { @@ -161,14 +164,13 @@ process.on('SIGINT', async () => { for (const sessionId in transports) { try { console.error(`Closing transport for session ${sessionId}`); - await transports[sessionId].close(); - delete transports[sessionId]; + await transports.get(sessionId)!.close(); + transports.delete(sessionId); } catch (error) { console.error(`Error closing transport for session ${sessionId}:`, error); } } - await cleanup(); - await server.close(); + console.error('Server shutdown complete'); process.exit(0); });