Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions src/everything/sse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,31 @@ const { server, cleanup } = createServer();
let transport: SSEServerTransport;

app.get("/sse", async (req, res) => {
console.log("Received connection");
console.error("Received connection");
transport = new SSEServerTransport("/message", res);
await server.connect(transport);

server.onclose = async () => {
await cleanup();
await server.close();
process.exit(0);
};

});

app.post("/message", async (req, res) => {
console.log("Received message");
console.error("Received message");

await transport.handlePostMessage(req, res);
});

process.on("SIGINT", async () => {
await cleanup();
await server.close();
process.exit(0);
});

const PORT = process.env.PORT || 3001;
app.listen(PORT, () => {
console.log(`Server is running on port ${PORT}`);
console.error(`Server is running on port ${PORT}`);
});

22 changes: 11 additions & 11 deletions src/everything/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const { server, cleanup } = createServer();
const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {};

app.post('/mcp', async (req: Request, res: Response) => {
console.log('Received MCP POST request');
console.error('Received MCP POST request');
try {
// Check for existing session ID
const sessionId = req.headers['mcp-session-id'] as string | undefined;
Expand All @@ -31,7 +31,7 @@ app.post('/mcp', async (req: Request, res: Response) => {
onsessioninitialized: (sessionId) => {
// Store the transport by session ID when session is initialized
// This avoids race conditions where requests might come in before the session is stored
console.log(`Session initialized with ID: ${sessionId}`);
console.error(`Session initialized with ID: ${sessionId}`);
transports[sessionId] = transport;
}
});
Expand All @@ -40,7 +40,7 @@ app.post('/mcp', async (req: Request, res: Response) => {
transport.onclose = () => {
const sid = transport.sessionId;
if (sid && transports[sid]) {
console.log(`Transport closed for session ${sid}, removing from transports map`);
console.error(`Transport closed for session ${sid}, removing from transports map`);
delete transports[sid];
}
};
Expand Down Expand Up @@ -85,7 +85,7 @@ app.post('/mcp', async (req: Request, res: Response) => {

// Handle GET requests for SSE streams (using built-in support from StreamableHTTP)
app.get('/mcp', async (req: Request, res: Response) => {
console.log('Received MCP GET request');
console.error('Received MCP GET request');
const sessionId = req.headers['mcp-session-id'] as string | undefined;
if (!sessionId || !transports[sessionId]) {
res.status(400).json({
Expand All @@ -102,9 +102,9 @@ app.get('/mcp', async (req: Request, res: Response) => {
// Check for Last-Event-ID header for resumability
const lastEventId = req.headers['last-event-id'] as string | undefined;
if (lastEventId) {
console.log(`Client reconnecting with Last-Event-ID: ${lastEventId}`);
console.error(`Client reconnecting with Last-Event-ID: ${lastEventId}`);
} else {
console.log(`Establishing new SSE stream for session ${sessionId}`);
console.error(`Establishing new SSE stream for session ${sessionId}`);
}

const transport = transports[sessionId];
Expand All @@ -126,7 +126,7 @@ app.delete('/mcp', async (req: Request, res: Response) => {
return;
}

console.log(`Received session termination request for session ${sessionId}`);
console.error(`Received session termination request for session ${sessionId}`);

try {
const transport = transports[sessionId];
Expand All @@ -150,17 +150,17 @@ app.delete('/mcp', async (req: Request, res: Response) => {
// Start the server
const PORT = process.env.PORT || 3001;
app.listen(PORT, () => {
console.log(`MCP Streamable HTTP Server listening on port ${PORT}`);
console.error(`MCP Streamable HTTP Server listening on port ${PORT}`);
});

// Handle server shutdown
process.on('SIGINT', async () => {
console.log('Shutting down server...');
console.error('Shutting down server...');

// Close all active transports to properly clean up resources
for (const sessionId in transports) {
try {
console.log(`Closing transport for session ${sessionId}`);
console.error(`Closing transport for session ${sessionId}`);
await transports[sessionId].close();
delete transports[sessionId];
} catch (error) {
Expand All @@ -169,6 +169,6 @@ process.on('SIGINT', async () => {
}
await cleanup();
await server.close();
console.log('Server shutdown complete');
console.error('Server shutdown complete');
process.exit(0);
});