From e515a4395f09a38283d8defe06b68a0fdc4c4eac Mon Sep 17 00:00:00 2001 From: majiayu000 <1835304752@qq.com> Date: Wed, 31 Dec 2025 06:41:12 +0800 Subject: [PATCH] fix: enforce retry limit when SSE stream makes no progress (#679) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, the StreamableClientTransport retry counter would reset whenever a connection was re-established, which could lead to infinite retry loops if a server continuously terminates connections without making progress (advancing the Last-Event-ID). This change tracks progress across retry attempts in handleSSE: - The retry counter is only reset when the Last-Event-ID advances - If maxRetries is exceeded without progress, the connection fails - This implements Option 2 from the issue discussion 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 Signed-off-by: majiayu000 <1835304752@qq.com> --- mcp/streamable.go | 32 +++++++++--- mcp/streamable_client_test.go | 95 +++++++++++++++++++++++++++++++++++ 2 files changed, 119 insertions(+), 8 deletions(-) diff --git a/mcp/streamable.go b/mcp/streamable.go index b4b2fa31..fc3e627f 100644 --- a/mcp/streamable.go +++ b/mcp/streamable.go @@ -1773,15 +1773,14 @@ func (c *streamableClientConn) handleJSON(requestSummary string, resp *http.Resp // stream is complete when we receive its response. Otherwise, this is the // standalone stream. func (c *streamableClientConn) handleSSE(ctx context.Context, requestSummary string, resp *http.Response, forCall *jsonrpc2.Request) { + // Track the last event ID to detect progress. + // The retry counter is only reset when progress is made (lastEventID advances). + // This prevents infinite retry loops when a server repeatedly terminates + // connections without making progress (#679). + var prevLastEventID string + retriesWithoutProgress := 0 + for { - // Connection was successful. Continue the loop with the new response. - // - // TODO(#679): we should set a reasonable limit on the number of times - // we'll try getting a response for a given request, or enforce that we - // actually make progress. - // - // Eventually, if we don't get the response, we should stop trying and - // fail the request. lastEventID, reconnectDelay, clientClosed := c.processStream(ctx, requestSummary, resp, forCall) // If the connection was closed by the client, we're done. @@ -1795,6 +1794,23 @@ func (c *streamableClientConn) handleSSE(ctx context.Context, requestSummary str return } + // Check if we made progress (lastEventID advanced). + // Only reset the retry counter when actual progress is made. + if lastEventID != "" && lastEventID != prevLastEventID { + // Progress was made: reset the retry counter. + retriesWithoutProgress = 0 + prevLastEventID = lastEventID + } else { + // No progress: increment the retry counter. + retriesWithoutProgress++ + if retriesWithoutProgress > c.maxRetries { + if ctx.Err() == nil { + c.fail(fmt.Errorf("%s: exceeded %d retries without progress (session ID: %v)", requestSummary, c.maxRetries, c.sessionID)) + } + return + } + } + // The stream was interrupted or ended by the server. Attempt to reconnect. newResp, err := c.connectSSE(ctx, lastEventID, reconnectDelay, false) if err != nil { diff --git a/mcp/streamable_client_test.go b/mcp/streamable_client_test.go index e2923325..e33f6c74 100644 --- a/mcp/streamable_client_test.go +++ b/mcp/streamable_client_test.go @@ -693,3 +693,98 @@ func TestStreamableClientTransientErrors(t *testing.T) { }) } } + +// TestStreamableClientRetryWithoutProgress verifies that the client fails after +// exceeding the retry limit when no progress is made (Last-Event-ID does not advance). +// This tests the fix for issue #679. +func TestStreamableClientRetryWithoutProgress(t *testing.T) { + // Speed up reconnection delays for testing. + const tick = 10 * time.Millisecond + defer func(delay time.Duration) { + reconnectInitialDelay = delay + }(reconnectInitialDelay) + reconnectInitialDelay = tick + + // Use the fakeStreamableServer pattern like other tests to avoid race conditions. + ctx := context.Background() + maxRetries := 2 + var retryCount atomic.Int32 + + fake := &fakeStreamableServer{ + t: t, + responses: fakeResponses{ + {"POST", "", methodInitialize, ""}: { + header: header{ + "Content-Type": "application/json", + sessionIDHeader: "test-session", + }, + body: jsonBody(t, initResp), + }, + {"POST", "test-session", notificationInitialized, ""}: { + status: http.StatusAccepted, + wantProtocolVersion: latestProtocolVersion, + }, + {"GET", "test-session", "", ""}: { + // Disable standalone SSE stream to simplify the test. + status: http.StatusMethodNotAllowed, + }, + {"POST", "test-session", methodCallTool, ""}: { + header: header{ + "Content-Type": "text/event-stream", + }, + // Return SSE stream with fixed event ID. + body: `id: fixed_1 +data: {"jsonrpc":"2.0","method":"notifications/message","params":{"level":"info","data":"test"}} + +`, + }, + // Resumption attempts with the same event ID (no progress). + {"GET", "test-session", "", "fixed_1"}: { + header: header{ + "Content-Type": "text/event-stream", + }, + responseFunc: func(r *jsonrpc.Request) (string, int) { + retryCount.Add(1) + // Return the same event ID - no progress. + return `id: fixed_1 +data: {"jsonrpc":"2.0","method":"notifications/message","params":{"level":"info","data":"retry"}} + +`, http.StatusOK + }, + }, + {"DELETE", "test-session", "", ""}: {optional: true}, + }, + } + + httpServer := httptest.NewServer(fake) + defer httpServer.Close() + + transport := &StreamableClientTransport{ + Endpoint: httpServer.URL, + MaxRetries: maxRetries, + } + client := NewClient(testImpl, nil) + session, err := client.Connect(ctx, transport, nil) + if err != nil { + t.Fatalf("Connect failed: %v", err) + } + defer session.Close() + + // Make a call that will trigger reconnections without progress. + _, err = session.CallTool(ctx, &CallToolParams{Name: "test"}) + if err == nil { + t.Fatal("CallTool succeeded unexpectedly, want error due to exceeded retries") + } + + // Check that the error mentions exceeding retries without progress. + wantErr := "exceeded" + if !strings.Contains(err.Error(), wantErr) { + t.Errorf("CallTool error = %q, want containing %q", err.Error(), wantErr) + } + + // Verify that we actually retried the expected number of times. + // We expect maxRetries+1 attempts because we increment before checking the limit. + if got := retryCount.Load(); got < int32(maxRetries) { + t.Errorf("retry count = %d, want at least %d", got, maxRetries) + } +}