Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 33 additions & 21 deletions src/everything/sse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,48 @@ console.error('Starting SSE server...');

const app = express();

const { server, cleanup } = createServer();

let transport: SSEServerTransport;
const transports: Map<string, SSEServerTransport> = new Map<string, SSEServerTransport>();

app.get("/sse", async (req, res) => {
console.error("Received connection");
transport = new SSEServerTransport("/message", res);
await server.connect(transport);

server.onclose = async () => {
await cleanup();
await server.close();
};
let transport: SSEServerTransport;
const { server, cleanup } = createServer();

if (req?.query?.sessionId) {
const sessionId = (req?.query?.sessionId as string);
transport = transports.get(sessionId) as SSEServerTransport;
console.error("Client Reconnecting? This shouldn't happen; when client has a sessionId, GET /sse should not be called again.", transport.sessionId);
} else {
// Create and store transport for new session
transport = new SSEServerTransport("/message", res);
transports.set(transport.sessionId, transport);

// Connect server to transport
await server.connect(transport);
console.error("Client Connected: ", transport.sessionId);

// Handle close of connection
server.onclose = async () => {
console.error("Client Disconnected: ", transport.sessionId);
transports.delete(transport.sessionId);
await cleanup();
};

}

});

app.post("/message", async (req, res) => {
console.error("Received message");

await transport.handlePostMessage(req, res);
});

process.on("SIGINT", async () => {
await cleanup();
await server.close();
process.exit(0);
const sessionId = (req?.query?.sessionId as string);
const transport = transports.get(sessionId);
if (transport) {
console.error("Client Message from", sessionId);
await transport.handlePostMessage(req, res);
} else {
console.error(`No transport found for sessionId ${sessionId}`)
}
});

const PORT = process.env.PORT || 3001;
app.listen(PORT, () => {
console.error(`Server is running on port ${PORT}`);
});

40 changes: 21 additions & 19 deletions src/everything/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ console.error('Starting Streamable HTTP server...');

const app = express();

const { server, cleanup } = createServer();

const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {};
const transports: Map<string, StreamableHTTPServerTransport> = new Map<string, StreamableHTTPServerTransport>();

app.post('/mcp', async (req: Request, res: Response) => {
console.error('Received MCP POST request');
Expand All @@ -19,10 +17,13 @@ app.post('/mcp', async (req: Request, res: Response) => {
const sessionId = req.headers['mcp-session-id'] as string | undefined;
let transport: StreamableHTTPServerTransport;

if (sessionId && transports[sessionId]) {
if (sessionId && transports.has(sessionId)) {
// Reuse existing transport
transport = transports[sessionId];
transport = transports.get(sessionId)!;
} else if (!sessionId) {

const { server, cleanup } = createServer();

// New initialization request
const eventStore = new InMemoryEventStore();
transport = new StreamableHTTPServerTransport({
Expand All @@ -32,16 +33,18 @@ app.post('/mcp', async (req: Request, res: Response) => {
// Store the transport by session ID when session is initialized
// This avoids race conditions where requests might come in before the session is stored
console.error(`Session initialized with ID: ${sessionId}`);
transports[sessionId] = transport;
transports.set(sessionId, transport);
}
});


// Set up onclose handler to clean up transport when closed
transport.onclose = () => {
server.onclose = async () => {
const sid = transport.sessionId;
if (sid && transports[sid]) {
if (sid && transports.has(sid)) {
console.error(`Transport closed for session ${sid}, removing from transports map`);
delete transports[sid];
transports.delete(sid);
await cleanup();
}
};

Expand Down Expand Up @@ -87,7 +90,7 @@ app.post('/mcp', async (req: Request, res: Response) => {
app.get('/mcp', async (req: Request, res: Response) => {
console.error('Received MCP GET request');
const sessionId = req.headers['mcp-session-id'] as string | undefined;
if (!sessionId || !transports[sessionId]) {
if (!sessionId || !transports.has(sessionId)) {
res.status(400).json({
jsonrpc: '2.0',
error: {
Expand All @@ -107,14 +110,14 @@ app.get('/mcp', async (req: Request, res: Response) => {
console.error(`Establishing new SSE stream for session ${sessionId}`);
}

const transport = transports[sessionId];
await transport.handleRequest(req, res);
const transport = transports.get(sessionId);
await transport!.handleRequest(req, res);
});

// Handle DELETE requests for session termination (according to MCP spec)
app.delete('/mcp', async (req: Request, res: Response) => {
const sessionId = req.headers['mcp-session-id'] as string | undefined;
if (!sessionId || !transports[sessionId]) {
if (!sessionId || !transports.has(sessionId)) {
res.status(400).json({
jsonrpc: '2.0',
error: {
Expand All @@ -129,8 +132,8 @@ app.delete('/mcp', async (req: Request, res: Response) => {
console.error(`Received session termination request for session ${sessionId}`);

try {
const transport = transports[sessionId];
await transport.handleRequest(req, res);
const transport = transports.get(sessionId);
await transport!.handleRequest(req, res);
} catch (error) {
console.error('Error handling session termination:', error);
if (!res.headersSent) {
Expand Down Expand Up @@ -161,14 +164,13 @@ process.on('SIGINT', async () => {
for (const sessionId in transports) {
try {
console.error(`Closing transport for session ${sessionId}`);
await transports[sessionId].close();
delete transports[sessionId];
await transports.get(sessionId)!.close();
transports.delete(sessionId);
} catch (error) {
console.error(`Error closing transport for session ${sessionId}:`, error);
}
}
await cleanup();
await server.close();

console.error('Server shutdown complete');
process.exit(0);
});