Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 60 additions & 46 deletions server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ const serverTransports: Map<string, Transport> = new Map<string, Transport>(); /

const createTransport = async (req: express.Request): Promise<Transport> => {
const query = req.query;
console.log("Query parameters:", query);
console.log("Query parameters:", JSON.stringify(query));

const transportType = query.transportType as string;

Expand All @@ -103,7 +103,7 @@ const createTransport = async (req: express.Request): Promise<Transport> => {

const { cmd, args } = findActualExecutable(command, origArgs);

console.log(`Stdio transport: command=${cmd}, args=${args}`);
console.log(`STDIO transport: command=${cmd}, args=${args}`);

const transport = new StdioClientTransport({
command: cmd,
Expand All @@ -113,8 +113,6 @@ const createTransport = async (req: express.Request): Promise<Transport> => {
});

await transport.start();

console.log("Spawned stdio transport");
return transport;
} else if (transportType === "sse") {
const url = query.url as string;
Expand All @@ -132,8 +130,6 @@ const createTransport = async (req: express.Request): Promise<Transport> => {
},
});
await transport.start();

console.log("Connected to SSE transport");
return transport;
} else if (transportType === "streamable-http") {
const headers = getHttpHeaders(req, transportType);
Expand All @@ -147,7 +143,6 @@ const createTransport = async (req: express.Request): Promise<Transport> => {
},
);
await transport.start();
console.log("Connected to Streamable HTTP transport");
return transport;
} else {
console.error(`Invalid transport type: ${transportType}`);
Expand Down Expand Up @@ -176,11 +171,10 @@ app.get("/mcp", async (req, res) => {

app.post("/mcp", async (req, res) => {
const sessionId = req.headers["mcp-session-id"] as string | undefined;
console.log(`Received POST message for sessionId ${sessionId}`);
let serverTransport: Transport | undefined;
if (!sessionId) {
try {
console.log("New streamable-http connection");
console.log("New StreamableHttp connection request");
try {
serverTransport = await createTransport(req);
} catch (error) {
Expand All @@ -196,16 +190,17 @@ app.post("/mcp", async (req, res) => {
throw error;
}

console.log("Connected MCP client to server transport");
console.log("Created StreamableHttp server transport");

const webAppTransport = new StreamableHTTPServerTransport({
sessionIdGenerator: randomUUID,
onsessioninitialized: (sessionId) => {
webAppTransports.set(sessionId, webAppTransport);
serverTransports.set(sessionId, serverTransport!);
console.log("Created streamable web app transport " + sessionId);
console.log("Client <-> Proxy sessionId: " + sessionId);
},
});
console.log("Created StreamableHttp client transport");

await webAppTransport.start();

Expand All @@ -224,6 +219,7 @@ app.post("/mcp", async (req, res) => {
res.status(500).json(error);
}
} else {
console.log(`Received POST message for sessionId ${sessionId}`);
try {
const transport = webAppTransports.get(
sessionId,
Expand Down Expand Up @@ -272,15 +268,15 @@ app.delete("/mcp", async (req, res) => {

app.get("/stdio", async (req, res) => {
try {
console.log("New connection");
console.log("New STDIO connection request");
let serverTransport: Transport | undefined;
try {
serverTransport = await createTransport(req);
console.log("Created server transport");
} catch (error) {
if (error instanceof SseError && error.code === 401) {
console.error(
"Received 401 Unauthorized from MCP server:",
error.message,
"Received 401 Unauthorized from MCP server. Authentication failure.",
);
res.status(401).json(error);
return;
Expand All @@ -289,31 +285,43 @@ app.get("/stdio", async (req, res) => {
throw error;
}

console.log("Connected MCP client to backing server transport");

const webAppTransport = new SSEServerTransport("/message", res);
console.log("Created client transport");

webAppTransports.set(webAppTransport.sessionId, webAppTransport);
serverTransports.set(webAppTransport.sessionId, serverTransport);
console.log("Created client/server transports");

await webAppTransport.start();

(serverTransport as StdioClientTransport).stderr!.on("data", (chunk) => {
webAppTransport.send({
jsonrpc: "2.0",
method: "notifications/stderr",
params: {
content: chunk.toString(),
},
});
if (chunk.toString().includes("MODULE_NOT_FOUND")) {
webAppTransport.send({
jsonrpc: "2.0",
method: "notifications/stderr",
params: {
content: "Command not found, transports removed",
},
});
webAppTransport.close();
serverTransport.close();
webAppTransports.delete(webAppTransport.sessionId);
serverTransports.delete(webAppTransport.sessionId);
console.error("Command not found, transports removed");
} else {
webAppTransport.send({
jsonrpc: "2.0",
method: "notifications/stderr",
params: {
content: chunk.toString(),
},
});
}
});

mcpProxy({
transportToClient: webAppTransport,
transportToServer: serverTransport,
});

console.log("Set up MCP proxy");
} catch (error) {
console.error("Error in /stdio route:", error);
res.status(500).json(error);
Expand All @@ -323,40 +331,46 @@ app.get("/stdio", async (req, res) => {
app.get("/sse", async (req, res) => {
try {
console.log(
"New SSE connection. NOTE: The sse transport is deprecated and has been replaced by streamable-http",
"New SSE connection request. NOTE: The sse transport is deprecated and has been replaced by StreamableHttp",
);
let serverTransport: Transport | undefined;
try {
serverTransport = await createTransport(req);
} catch (error) {
if (error instanceof SseError && error.code === 401) {
console.error(
"Received 401 Unauthorized from MCP server:",
error.message,
"Received 401 Unauthorized from MCP server. Authentication failure.",
);
res.status(401).json(error);
return;
} else if (error instanceof SseError && error.code === 404) {
console.error(
"Received 404 not found from MCP server. Does the MCP server support SSE?",
);
res.status(404).json(error);
return;
} else if (JSON.stringify(error).includes("ECONNREFUSED")) {
console.error("Connection refused. Is the MCP server running?");
res.status(500).json(error);
} else {
throw error;
}

throw error;
}

console.log("Connected MCP client to backing server transport");
if (serverTransport) {
const webAppTransport = new SSEServerTransport("/message", res);
webAppTransports.set(webAppTransport.sessionId, webAppTransport);
console.log("Created client transport");
serverTransports.set(webAppTransport.sessionId, serverTransport!);
console.log("Created server transport");

const webAppTransport = new SSEServerTransport("/message", res);
webAppTransports.set(webAppTransport.sessionId, webAppTransport);
console.log("Created client transport");
serverTransports.set(webAppTransport.sessionId, serverTransport);
console.log("Created server transport");

await webAppTransport.start();

mcpProxy({
transportToClient: webAppTransport,
transportToServer: serverTransport,
});
await webAppTransport.start();

console.log("Set up MCP proxy");
mcpProxy({
transportToClient: webAppTransport,
transportToServer: serverTransport,
});
}
} catch (error) {
console.error("Error in /sse route:", error);
res.status(500).json(error);
Expand All @@ -366,7 +380,7 @@ app.get("/sse", async (req, res) => {
app.post("/message", async (req, res) => {
try {
const sessionId = req.query.sessionId;
console.log(`Received message for sessionId ${sessionId}`);
console.log(`Received POST message for sessionId ${sessionId}`);

const transport = webAppTransports.get(
sessionId as string,
Expand Down
21 changes: 20 additions & 1 deletion server/src/mcpProxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,15 @@ function onClientError(error: Error) {
}

function onServerError(error: Error) {
console.error("Error from MCP server:", error);
if (
(error?.message &&
error.message.includes("Error POSTing to endpoint (HTTP 404)")) ||
(error?.cause && JSON.stringify(error.cause).includes("ECONNREFUSED"))
) {
console.error("Connection refused. Is the MCP server running?");
} else {
console.error("Error from MCP server:", error);
}
}

export default function mcpProxy({
Expand All @@ -19,6 +27,8 @@ export default function mcpProxy({
let transportToClientClosed = false;
let transportToServerClosed = false;

let reportedServerSession = false;

transportToClient.onmessage = (message) => {
transportToServer.send(message).catch((error) => {
// Send error response back to client if it was a request (has id) and connection is still open
Expand All @@ -38,6 +48,15 @@ export default function mcpProxy({
};

transportToServer.onmessage = (message) => {
if (!reportedServerSession) {
if (transportToServer.sessionId) {
// Can only report for StreamableHttp
console.error(
"Proxy <-> Server sessionId: " + transportToServer.sessionId,
);
}
reportedServerSession = true;
}
transportToClient.send(message).catch(onClientError);
};

Expand Down