-
Notifications
You must be signed in to change notification settings - Fork 1.5k
StreamableHttp transport - backwards compatibility examples #347
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 5 commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
4d2968c
backward compatible server example
ihrpr 8328cfe
example of sse server
ihrpr 80b0bab
backwards compatible client
ihrpr 7341c9a
fix lint
ihrpr 9e60411
Merge branch 'main' into ihrpr/backwards_compatibility
ihrpr 29ae6b8
appying suggested changes
ihrpr File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
188 changes: 188 additions & 0 deletions
188
src/examples/client/streamableHttpWithSseFallbackClient.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,188 @@ | ||
| import { Client } from '../../client/index.js'; | ||
| import { StreamableHTTPClientTransport } from '../../client/streamableHttp.js'; | ||
| import { SSEClientTransport } from '../../client/sse.js'; | ||
| import { | ||
| ListToolsRequest, | ||
| ListToolsResultSchema, | ||
| CallToolRequest, | ||
| CallToolResultSchema, | ||
| LoggingMessageNotificationSchema, | ||
| } from '../../types.js'; | ||
|
|
||
| /** | ||
| * Simplified Backwards Compatible MCP Client | ||
| * | ||
| * This client demonstrates backward compatibility with both: | ||
| * 1. Modern servers using Streamable HTTP transport (protocol version 2025-03-26) | ||
| * 2. Older servers using HTTP+SSE transport (protocol version 2024-11-05) | ||
| * | ||
| * Following the MCP specification for backwards compatibility: | ||
| * - Attempts to POST an initialize request to the server URL first (modern transport) | ||
| * - If that fails with 4xx status, falls back to GET request for SSE stream (older transport) | ||
| */ | ||
|
|
||
| // Command line args processing | ||
| const args = process.argv.slice(2); | ||
| const serverUrl = args[0] || 'http://localhost:3000/mcp'; | ||
|
|
||
| async function main(): Promise<void> { | ||
| console.log('MCP Backwards Compatible Client'); | ||
| console.log('==============================='); | ||
| console.log(`Connecting to server at: ${serverUrl}`); | ||
|
|
||
| let client: Client; | ||
| let transport: StreamableHTTPClientTransport | SSEClientTransport; | ||
|
|
||
| try { | ||
| // Try connecting with automatic transport detection | ||
| const connection = await connectWithBackwardsCompatibility(serverUrl); | ||
| client = connection.client; | ||
| transport = connection.transport; | ||
|
|
||
| // Set up notification handler | ||
| client.setNotificationHandler(LoggingMessageNotificationSchema, (notification) => { | ||
| console.log(`Notification: ${notification.params.level} - ${notification.params.data}`); | ||
| }); | ||
|
|
||
| // DEMO WORKFLOW: | ||
| // 1. List available tools | ||
| console.log('\n=== Listing Available Tools ==='); | ||
| await listTools(client); | ||
|
|
||
| // 2. Call the notification tool | ||
| console.log('\n=== Starting Notification Stream ==='); | ||
| await startNotificationTool(client); | ||
|
|
||
| // 3. Wait for all notifications (5 seconds) | ||
| console.log('\n=== Waiting for all notifications ==='); | ||
| await new Promise(resolve => setTimeout(resolve, 5000)); | ||
|
|
||
| // 4. Disconnect | ||
| console.log('\n=== Disconnecting ==='); | ||
| await transport.close(); | ||
| console.log('Disconnected from MCP server'); | ||
|
|
||
| } catch (error) { | ||
| console.error('Error running client:', error); | ||
| process.exit(1); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Connect to an MCP server with backwards compatibility | ||
| * Following the spec for client backward compatibility | ||
| */ | ||
| async function connectWithBackwardsCompatibility(url: string): Promise<{ | ||
| client: Client, | ||
| transport: StreamableHTTPClientTransport | SSEClientTransport, | ||
| transportType: 'streamable-http' | 'sse' | ||
| }> { | ||
| console.log('1. Trying Streamable HTTP transport first...'); | ||
|
|
||
| // Step 1: Try Streamable HTTP transport first | ||
| const client = new Client({ | ||
| name: 'backwards-compatible-client', | ||
| version: '1.0.0' | ||
| }); | ||
|
|
||
| client.onerror = (error) => { | ||
| console.error('Client error:', error); | ||
| }; | ||
| const baseUrl = new URL(url); | ||
|
|
||
| try { | ||
| // Create modern transport | ||
| const streamableTransport = new StreamableHTTPClientTransport(baseUrl); | ||
| await client.connect(streamableTransport); | ||
|
|
||
| console.log('Successfully connected using modern Streamable HTTP transport.'); | ||
| return { | ||
| client, | ||
| transport: streamableTransport, | ||
| transportType: 'streamable-http' | ||
| }; | ||
| } catch (error) { | ||
| // Step 2: If transport fails, try the older SSE transport | ||
| console.log(`StreamableHttp transport connection failed: ${error}`); | ||
| console.log('2. Falling back to deprecated HTTP+SSE transport...'); | ||
|
|
||
| try { | ||
| // Create SSE transport pointing to /sse endpoint | ||
| const sseTransport = new SSEClientTransport(baseUrl); | ||
| await client.connect(sseTransport); | ||
|
|
||
| console.log('Successfully connected using deprecated HTTP+SSE transport.'); | ||
| return { | ||
| client, | ||
| transport: sseTransport, | ||
| transportType: 'sse' | ||
| }; | ||
| } catch (sseError) { | ||
| console.error(`Failed to connect with either transport method:\n1. Streamable HTTP error: ${error}\n2. SSE error: ${sseError}`); | ||
| throw new Error('Could not connect to server with any available transport'); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * List available tools on the server | ||
| */ | ||
| async function listTools(client: Client): Promise<void> { | ||
| try { | ||
| const toolsRequest: ListToolsRequest = { | ||
| method: 'tools/list', | ||
| params: {} | ||
| }; | ||
| const toolsResult = await client.request(toolsRequest, ListToolsResultSchema); | ||
|
|
||
| console.log('Available tools:'); | ||
| if (toolsResult.tools.length === 0) { | ||
| console.log(' No tools available'); | ||
| } else { | ||
| for (const tool of toolsResult.tools) { | ||
| console.log(` - ${tool.name}: ${tool.description}`); | ||
| } | ||
| } | ||
| } catch (error) { | ||
| console.log(`Tools not supported by this server: ${error}`); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Start a notification stream by calling the notification tool | ||
| */ | ||
| async function startNotificationTool(client: Client): Promise<void> { | ||
| try { | ||
| // Call the notification tool using reasonable defaults | ||
| const request: CallToolRequest = { | ||
| method: 'tools/call', | ||
| params: { | ||
| name: 'start-notification-stream', | ||
| arguments: { | ||
| interval: 1000, // 1 second between notifications | ||
| count: 5 // Send 5 notifications | ||
| } | ||
| } | ||
| }; | ||
|
|
||
| console.log('Calling notification tool...'); | ||
| const result = await client.request(request, CallToolResultSchema); | ||
|
|
||
| console.log('Tool result:'); | ||
| result.content.forEach(item => { | ||
| if (item.type === 'text') { | ||
| console.log(` ${item.text}`); | ||
| } else { | ||
| console.log(` ${item.type} content:`, item); | ||
| } | ||
| }); | ||
| } catch (error) { | ||
| console.log(`Error calling notification tool: ${error}`); | ||
| } | ||
| } | ||
|
|
||
| // Start the client | ||
| main().catch((error: unknown) => { | ||
| console.error('Error running MCP client:', error); | ||
| process.exit(1); | ||
| }); | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,169 @@ | ||
| import express, { Request, Response } from 'express'; | ||
| import { McpServer } from '../../server/mcp.js'; | ||
| import { SSEServerTransport } from '../../server/sse.js'; | ||
| import { z } from 'zod'; | ||
| import { CallToolResult } from '../../types.js'; | ||
|
|
||
| /** | ||
| * This example server demonstrates the deprecated HTTP+SSE transport | ||
| * (protocol version 2024-11-05). It mainly used for testing backward compatible clients. | ||
| * | ||
| * The server exposes two endpoints: | ||
| * - /sse: For establishing the SSE stream (GET) | ||
| * - /messages: For receiving client messages (POST) | ||
| * | ||
| */ | ||
|
|
||
| // Create an MCP server instance | ||
| const server = new McpServer({ | ||
| name: 'simple-sse-server', | ||
| version: '1.0.0', | ||
| }, { capabilities: { logging: {} } }); | ||
|
|
||
| server.tool( | ||
| 'start-notification-stream', | ||
| 'Starts sending periodic notifications', | ||
| { | ||
| interval: z.number().describe('Interval in milliseconds between notifications').default(1000), | ||
| count: z.number().describe('Number of notifications to send').default(10), | ||
| }, | ||
| async ({ interval, count }, { sendNotification }): Promise<CallToolResult> => { | ||
| const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)); | ||
| let counter = 0; | ||
|
|
||
| // Send the initial notification | ||
| await sendNotification({ | ||
| method: "notifications/message", | ||
| params: { | ||
| level: "info", | ||
| data: `Starting notification stream with ${count} messages every ${interval}ms` | ||
| } | ||
| }); | ||
|
|
||
| // Send periodic notifications | ||
| while (counter < count) { | ||
| counter++; | ||
| await sleep(interval); | ||
|
|
||
| try { | ||
| await sendNotification({ | ||
| method: "notifications/message", | ||
| params: { | ||
| level: "info", | ||
| data: `Notification #${counter} at ${new Date().toISOString()}` | ||
| } | ||
| }); | ||
| } | ||
| catch (error) { | ||
| console.error("Error sending notification:", error); | ||
| } | ||
| } | ||
|
|
||
| return { | ||
| content: [ | ||
| { | ||
| type: 'text', | ||
| text: `Completed sending ${count} notifications every ${interval}ms`, | ||
| } | ||
| ], | ||
| }; | ||
| } | ||
| ); | ||
|
|
||
| const app = express(); | ||
| app.use(express.json()); | ||
|
|
||
| // Store transports by session ID | ||
| const transports: Record<string, SSEServerTransport> = {}; | ||
|
|
||
| // SSE endpoint for establishing the stream | ||
| app.get('/mcp', async (req: Request, res: Response) => { | ||
| console.log('Received GET request to /sse (establishing SSE stream)'); | ||
|
|
||
| try { | ||
| // Create a new SSE transport for the client | ||
| // The endpoint for POST messages is '/messages' | ||
| const transport = new SSEServerTransport('/messages', res); | ||
|
|
||
| // Store the transport by session ID | ||
| const sessionId = transport.sessionId; | ||
| transports[sessionId] = transport; | ||
|
|
||
| // Set up onclose handler to clean up transport when closed | ||
| transport.onclose = () => { | ||
| console.log(`SSE transport closed for session ${sessionId}`); | ||
| delete transports[sessionId]; | ||
| }; | ||
|
|
||
| // Connect the transport to the MCP server | ||
| await server.connect(transport); | ||
|
|
||
| // Start the SSE transport to begin streaming | ||
| // This sends an initial 'endpoint' event with the session ID in the URL | ||
| await transport.start(); | ||
|
|
||
| console.log(`Established SSE stream with session ID: ${sessionId}`); | ||
| } catch (error) { | ||
| console.error('Error establishing SSE stream:', error); | ||
| if (!res.headersSent) { | ||
| res.status(500).send('Error establishing SSE stream'); | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| // Messages endpoint for receiving client JSON-RPC requests | ||
| app.post('/messages', async (req: Request, res: Response) => { | ||
| console.log('Received POST request to /messages'); | ||
|
|
||
| // Extract session ID from URL query parameter | ||
| // In the SSE protocol, this is added by the client based on the endpoint event | ||
| const sessionId = req.query.sessionId as string | undefined; | ||
|
|
||
| if (!sessionId) { | ||
| console.error('No session ID provided in request URL'); | ||
| res.status(400).send('Missing sessionId parameter'); | ||
| return; | ||
| } | ||
|
|
||
| const transport = transports[sessionId]; | ||
| if (!transport) { | ||
| console.error(`No active transport found for session ID: ${sessionId}`); | ||
| res.status(404).send('Session not found'); | ||
| return; | ||
| } | ||
|
|
||
| try { | ||
| // Handle the POST message with the transport | ||
| await transport.handlePostMessage(req, res, req.body); | ||
| } catch (error) { | ||
| console.error('Error handling request:', error); | ||
| if (!res.headersSent) { | ||
| res.status(500).send('Error handling request'); | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| // Start the server | ||
| const PORT = 3000; | ||
| app.listen(PORT, () => { | ||
| console.log(`Simple SSE Server (deprecated protocol version 2024-11-05) listening on port ${PORT}`); | ||
| }); | ||
|
|
||
| // Handle server shutdown | ||
| process.on('SIGINT', async () => { | ||
| console.log('Shutting down server...'); | ||
|
|
||
| // Close all active transports to properly clean up resources | ||
| for (const sessionId in transports) { | ||
| try { | ||
| console.log(`Closing transport for session ${sessionId}`); | ||
| await transports[sessionId].close(); | ||
| delete transports[sessionId]; | ||
| } catch (error) { | ||
| console.error(`Error closing transport for session ${sessionId}:`, error); | ||
| } | ||
| } | ||
| await server.close(); | ||
| console.log('Server shutdown complete'); | ||
| process.exit(0); | ||
| }); |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.