From 52fb100805eedfaed04be01dc144a58d21e16a4e Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Fri, 28 Nov 2025 15:26:02 +0000 Subject: [PATCH 1/2] fix: don't reconnect POST stream when response was already received Per SEP-1699, the client should only reconnect when the server disconnects BEFORE sending a response (for long-running operations). The previous implementation would reconnect after ANY stream completion when a priming event was received, even when the response was already delivered. This fix tracks whether a response was received and skips reconnection if so. This prevents: 1. Unnecessary network round-trips 2. Race conditions with rapid stream cycling (which caused crashes in consumers like the inspector) Adds test case to verify the correct behavior. --- src/client/streamableHttp.test.ts | 55 +++++++++++++++++++++++++++++++ src/client/streamableHttp.ts | 19 ++++++++--- 2 files changed, 70 insertions(+), 4 deletions(-) diff --git a/src/client/streamableHttp.test.ts b/src/client/streamableHttp.test.ts index 51a6a7dfa..792520d63 100644 --- a/src/client/streamableHttp.test.ts +++ b/src/client/streamableHttp.test.ts @@ -866,6 +866,61 @@ describe('StreamableHTTPClientTransport', () => { expect(reconnectHeaders.get('last-event-id')).toBe('event-123'); }); + it('should NOT reconnect a POST stream when response was received', async () => { + // ARRANGE + transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { + reconnectionOptions: { + initialReconnectionDelay: 10, + maxRetries: 1, + maxReconnectionDelay: 1000, + reconnectionDelayGrowFactor: 1 + } + }); + + // Create a stream that sends: + // 1. Priming event with ID (enables potential reconnection) + // 2. The actual response (should prevent reconnection) + // 3. Then closes + const streamWithResponse = new ReadableStream({ + start(controller) { + // Priming event with ID + controller.enqueue(new TextEncoder().encode('id: priming-123\ndata: \n\n')); + // The actual response to the request + controller.enqueue( + new TextEncoder().encode('id: response-456\ndata: {"jsonrpc":"2.0","result":{"tools":[]},"id":"request-1"}\n\n') + ); + // Stream closes normally + controller.close(); + } + }); + + const fetchMock = global.fetch as Mock; + fetchMock.mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers({ 'content-type': 'text/event-stream' }), + body: streamWithResponse + }); + + const requestMessage: JSONRPCRequest = { + jsonrpc: '2.0', + method: 'tools/list', + id: 'request-1', + params: {} + }; + + // ACT + await transport.start(); + await transport.send(requestMessage); + await vi.advanceTimersByTimeAsync(50); + + // ASSERT + // THE KEY ASSERTION: Fetch was called ONCE only - no reconnection! + // The response was received, so no need to reconnect. + expect(fetchMock).toHaveBeenCalledTimes(1); + expect(fetchMock.mock.calls[0][1]?.method).toBe('POST'); + }); + it('should not throw JSON parse error on priming events with empty data', async () => { transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp')); diff --git a/src/client/streamableHttp.ts b/src/client/streamableHttp.ts index 2adc32e10..a00e3696d 100644 --- a/src/client/streamableHttp.ts +++ b/src/client/streamableHttp.ts @@ -307,6 +307,9 @@ export class StreamableHTTPClientTransport implements Transport { // Track whether we've received a priming event (event with ID) // Per spec, server SHOULD send a priming event with ID before closing let hasPrimingEvent = false; + // Track whether we've received a response - if so, no need to reconnect + // Per SEP-1699, reconnection is for when server disconnects BEFORE sending response + let receivedResponse = false; const processStream = async () => { // this is the closest we can get to trying to catch network errors // if something happens reader will throw @@ -346,8 +349,12 @@ export class StreamableHTTPClientTransport implements Transport { if (!event.event || event.event === 'message') { try { const message = JSONRPCMessageSchema.parse(JSON.parse(event.data)); - if (replayMessageId !== undefined && isJSONRPCResponse(message)) { - message.id = replayMessageId; + if (isJSONRPCResponse(message)) { + // Mark that we received a response - no need to reconnect for this request + receivedResponse = true; + if (replayMessageId !== undefined) { + message.id = replayMessageId; + } } this.onmessage?.(message); } catch (error) { @@ -359,8 +366,10 @@ export class StreamableHTTPClientTransport implements Transport { // Handle graceful server-side disconnect // Server may close connection after sending event ID and retry field // Reconnect if: already reconnectable (GET stream) OR received a priming event (POST stream with event ID) + // BUT don't reconnect if we already received a response - the request is complete const canResume = isReconnectable || hasPrimingEvent; - if (canResume && this._abortController && !this._abortController.signal.aborted) { + const needsReconnect = canResume && !receivedResponse; + if (needsReconnect && this._abortController && !this._abortController.signal.aborted) { this._scheduleReconnection( { resumptionToken: lastEventId, @@ -376,8 +385,10 @@ export class StreamableHTTPClientTransport implements Transport { // Attempt to reconnect if the stream disconnects unexpectedly and we aren't closing // Reconnect if: already reconnectable (GET stream) OR received a priming event (POST stream with event ID) + // BUT don't reconnect if we already received a response - the request is complete const canResume = isReconnectable || hasPrimingEvent; - if (canResume && this._abortController && !this._abortController.signal.aborted) { + const needsReconnect = canResume && !receivedResponse; + if (needsReconnect && this._abortController && !this._abortController.signal.aborted) { // Use the exponential backoff reconnection strategy try { this._scheduleReconnection( From 3be665f282669bc9d343382f5203d87498c51b22 Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Fri, 28 Nov 2025 18:28:04 +0000 Subject: [PATCH 2/2] fix: cancel pending reconnection timeout on close() Track the reconnection timeout ID and clear it when close() is called. This prevents reconnection attempts after the transport is closed, which was causing "Failed to reconnect SSE stream" errors on disconnect. Adds test case to verify reconnection is cancelled after close(). --- src/client/streamableHttp.test.ts | 51 +++++++++++++++++++++++++++++++ src/client/streamableHttp.ts | 11 ++++--- 2 files changed, 58 insertions(+), 4 deletions(-) diff --git a/src/client/streamableHttp.test.ts b/src/client/streamableHttp.test.ts index 792520d63..596ad2310 100644 --- a/src/client/streamableHttp.test.ts +++ b/src/client/streamableHttp.test.ts @@ -921,6 +921,57 @@ describe('StreamableHTTPClientTransport', () => { expect(fetchMock.mock.calls[0][1]?.method).toBe('POST'); }); + it('should not attempt reconnection after close() is called', async () => { + // ARRANGE + transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { + reconnectionOptions: { + initialReconnectionDelay: 100, + maxRetries: 3, + maxReconnectionDelay: 1000, + reconnectionDelayGrowFactor: 1 + } + }); + + // Stream with priming event + notification (no response) that closes + // This triggers reconnection scheduling + const streamWithPriming = new ReadableStream({ + start(controller) { + controller.enqueue( + new TextEncoder().encode('id: event-123\ndata: {"jsonrpc":"2.0","method":"notifications/test","params":{}}\n\n') + ); + controller.close(); + } + }); + + const fetchMock = global.fetch as Mock; + + // POST request returns streaming response + fetchMock.mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers({ 'content-type': 'text/event-stream' }), + body: streamWithPriming + }); + + // ACT + await transport.start(); + await transport.send({ jsonrpc: '2.0', method: 'test', id: '1', params: {} }); + + // Wait a tick to let stream processing complete and schedule reconnection + await vi.advanceTimersByTimeAsync(10); + + // Now close() - reconnection timeout is pending (scheduled for 100ms) + await transport.close(); + + // Advance past reconnection delay + await vi.advanceTimersByTimeAsync(200); + + // ASSERT + // Only 1 call: the initial POST. No reconnection attempts after close(). + expect(fetchMock).toHaveBeenCalledTimes(1); + expect(fetchMock.mock.calls[0][1]?.method).toBe('POST'); + }); + it('should not throw JSON parse error on priming events with empty data', async () => { transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp')); diff --git a/src/client/streamableHttp.ts b/src/client/streamableHttp.ts index a00e3696d..9cc4887df 100644 --- a/src/client/streamableHttp.ts +++ b/src/client/streamableHttp.ts @@ -136,6 +136,7 @@ export class StreamableHTTPClientTransport implements Transport { private _hasCompletedAuthFlow = false; // Circuit breaker: detect auth success followed by immediate 401 private _lastUpscopingHeader?: string; // Track last upscoping header to prevent infinite upscoping. private _serverRetryMs?: number; // Server-provided retry delay from SSE retry field + private _reconnectionTimeout?: ReturnType; onclose?: () => void; onerror?: (error: Error) => void; @@ -287,7 +288,7 @@ export class StreamableHTTPClientTransport implements Transport { const delay = this._getNextReconnectionDelay(attemptCount); // Schedule the reconnection - setTimeout(() => { + this._reconnectionTimeout = setTimeout(() => { // Use the last event ID to resume where we left off this._startOrAuthSse(options).catch(error => { this.onerror?.(new Error(`Failed to reconnect SSE stream: ${error instanceof Error ? error.message : String(error)}`)); @@ -308,7 +309,7 @@ export class StreamableHTTPClientTransport implements Transport { // Per spec, server SHOULD send a priming event with ID before closing let hasPrimingEvent = false; // Track whether we've received a response - if so, no need to reconnect - // Per SEP-1699, reconnection is for when server disconnects BEFORE sending response + // Reconnection is for when server disconnects BEFORE sending response let receivedResponse = false; const processStream = async () => { // this is the closest we can get to trying to catch network errors @@ -439,9 +440,11 @@ export class StreamableHTTPClientTransport implements Transport { } async close(): Promise { - // Abort any pending requests + if (this._reconnectionTimeout) { + clearTimeout(this._reconnectionTimeout); + this._reconnectionTimeout = undefined; + } this._abortController?.abort(); - this.onclose?.(); }