diff --git a/src/client/streamableHttp.test.ts b/src/client/streamableHttp.test.ts index 51a6a7dfa..596ad2310 100644 --- a/src/client/streamableHttp.test.ts +++ b/src/client/streamableHttp.test.ts @@ -866,6 +866,112 @@ describe('StreamableHTTPClientTransport', () => { expect(reconnectHeaders.get('last-event-id')).toBe('event-123'); }); + it('should NOT reconnect a POST stream when response was received', async () => { + // ARRANGE + transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { + reconnectionOptions: { + initialReconnectionDelay: 10, + maxRetries: 1, + maxReconnectionDelay: 1000, + reconnectionDelayGrowFactor: 1 + } + }); + + // Create a stream that sends: + // 1. Priming event with ID (enables potential reconnection) + // 2. The actual response (should prevent reconnection) + // 3. Then closes + const streamWithResponse = new ReadableStream({ + start(controller) { + // Priming event with ID + controller.enqueue(new TextEncoder().encode('id: priming-123\ndata: \n\n')); + // The actual response to the request + controller.enqueue( + new TextEncoder().encode('id: response-456\ndata: {"jsonrpc":"2.0","result":{"tools":[]},"id":"request-1"}\n\n') + ); + // Stream closes normally + controller.close(); + } + }); + + const fetchMock = global.fetch as Mock; + fetchMock.mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers({ 'content-type': 'text/event-stream' }), + body: streamWithResponse + }); + + const requestMessage: JSONRPCRequest = { + jsonrpc: '2.0', + method: 'tools/list', + id: 'request-1', + params: {} + }; + + // ACT + await transport.start(); + await transport.send(requestMessage); + await vi.advanceTimersByTimeAsync(50); + + // ASSERT + // THE KEY ASSERTION: Fetch was called ONCE only - no reconnection! + // The response was received, so no need to reconnect. + expect(fetchMock).toHaveBeenCalledTimes(1); + expect(fetchMock.mock.calls[0][1]?.method).toBe('POST'); + }); + + it('should not attempt reconnection after close() is called', async () => { + // ARRANGE + transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), { + reconnectionOptions: { + initialReconnectionDelay: 100, + maxRetries: 3, + maxReconnectionDelay: 1000, + reconnectionDelayGrowFactor: 1 + } + }); + + // Stream with priming event + notification (no response) that closes + // This triggers reconnection scheduling + const streamWithPriming = new ReadableStream({ + start(controller) { + controller.enqueue( + new TextEncoder().encode('id: event-123\ndata: {"jsonrpc":"2.0","method":"notifications/test","params":{}}\n\n') + ); + controller.close(); + } + }); + + const fetchMock = global.fetch as Mock; + + // POST request returns streaming response + fetchMock.mockResolvedValueOnce({ + ok: true, + status: 200, + headers: new Headers({ 'content-type': 'text/event-stream' }), + body: streamWithPriming + }); + + // ACT + await transport.start(); + await transport.send({ jsonrpc: '2.0', method: 'test', id: '1', params: {} }); + + // Wait a tick to let stream processing complete and schedule reconnection + await vi.advanceTimersByTimeAsync(10); + + // Now close() - reconnection timeout is pending (scheduled for 100ms) + await transport.close(); + + // Advance past reconnection delay + await vi.advanceTimersByTimeAsync(200); + + // ASSERT + // Only 1 call: the initial POST. No reconnection attempts after close(). + expect(fetchMock).toHaveBeenCalledTimes(1); + expect(fetchMock.mock.calls[0][1]?.method).toBe('POST'); + }); + it('should not throw JSON parse error on priming events with empty data', async () => { transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp')); diff --git a/src/client/streamableHttp.ts b/src/client/streamableHttp.ts index 2adc32e10..9cc4887df 100644 --- a/src/client/streamableHttp.ts +++ b/src/client/streamableHttp.ts @@ -136,6 +136,7 @@ export class StreamableHTTPClientTransport implements Transport { private _hasCompletedAuthFlow = false; // Circuit breaker: detect auth success followed by immediate 401 private _lastUpscopingHeader?: string; // Track last upscoping header to prevent infinite upscoping. private _serverRetryMs?: number; // Server-provided retry delay from SSE retry field + private _reconnectionTimeout?: ReturnType; onclose?: () => void; onerror?: (error: Error) => void; @@ -287,7 +288,7 @@ export class StreamableHTTPClientTransport implements Transport { const delay = this._getNextReconnectionDelay(attemptCount); // Schedule the reconnection - setTimeout(() => { + this._reconnectionTimeout = setTimeout(() => { // Use the last event ID to resume where we left off this._startOrAuthSse(options).catch(error => { this.onerror?.(new Error(`Failed to reconnect SSE stream: ${error instanceof Error ? error.message : String(error)}`)); @@ -307,6 +308,9 @@ export class StreamableHTTPClientTransport implements Transport { // Track whether we've received a priming event (event with ID) // Per spec, server SHOULD send a priming event with ID before closing let hasPrimingEvent = false; + // Track whether we've received a response - if so, no need to reconnect + // Reconnection is for when server disconnects BEFORE sending response + let receivedResponse = false; const processStream = async () => { // this is the closest we can get to trying to catch network errors // if something happens reader will throw @@ -346,8 +350,12 @@ export class StreamableHTTPClientTransport implements Transport { if (!event.event || event.event === 'message') { try { const message = JSONRPCMessageSchema.parse(JSON.parse(event.data)); - if (replayMessageId !== undefined && isJSONRPCResponse(message)) { - message.id = replayMessageId; + if (isJSONRPCResponse(message)) { + // Mark that we received a response - no need to reconnect for this request + receivedResponse = true; + if (replayMessageId !== undefined) { + message.id = replayMessageId; + } } this.onmessage?.(message); } catch (error) { @@ -359,8 +367,10 @@ export class StreamableHTTPClientTransport implements Transport { // Handle graceful server-side disconnect // Server may close connection after sending event ID and retry field // Reconnect if: already reconnectable (GET stream) OR received a priming event (POST stream with event ID) + // BUT don't reconnect if we already received a response - the request is complete const canResume = isReconnectable || hasPrimingEvent; - if (canResume && this._abortController && !this._abortController.signal.aborted) { + const needsReconnect = canResume && !receivedResponse; + if (needsReconnect && this._abortController && !this._abortController.signal.aborted) { this._scheduleReconnection( { resumptionToken: lastEventId, @@ -376,8 +386,10 @@ export class StreamableHTTPClientTransport implements Transport { // Attempt to reconnect if the stream disconnects unexpectedly and we aren't closing // Reconnect if: already reconnectable (GET stream) OR received a priming event (POST stream with event ID) + // BUT don't reconnect if we already received a response - the request is complete const canResume = isReconnectable || hasPrimingEvent; - if (canResume && this._abortController && !this._abortController.signal.aborted) { + const needsReconnect = canResume && !receivedResponse; + if (needsReconnect && this._abortController && !this._abortController.signal.aborted) { // Use the exponential backoff reconnection strategy try { this._scheduleReconnection( @@ -428,9 +440,11 @@ export class StreamableHTTPClientTransport implements Transport { } async close(): Promise { - // Abort any pending requests + if (this._reconnectionTimeout) { + clearTimeout(this._reconnectionTimeout); + this._reconnectionTimeout = undefined; + } this._abortController?.abort(); - this.onclose?.(); }