diff --git a/mcp/streamable.go b/mcp/streamable.go index b4b2fa31..fc3e627f 100644 --- a/mcp/streamable.go +++ b/mcp/streamable.go @@ -1773,15 +1773,14 @@ func (c *streamableClientConn) handleJSON(requestSummary string, resp *http.Resp // stream is complete when we receive its response. Otherwise, this is the // standalone stream. func (c *streamableClientConn) handleSSE(ctx context.Context, requestSummary string, resp *http.Response, forCall *jsonrpc2.Request) { + // Track the last event ID to detect progress. + // The retry counter is only reset when progress is made (lastEventID advances). + // This prevents infinite retry loops when a server repeatedly terminates + // connections without making progress (#679). + var prevLastEventID string + retriesWithoutProgress := 0 + for { - // Connection was successful. Continue the loop with the new response. - // - // TODO(#679): we should set a reasonable limit on the number of times - // we'll try getting a response for a given request, or enforce that we - // actually make progress. - // - // Eventually, if we don't get the response, we should stop trying and - // fail the request. lastEventID, reconnectDelay, clientClosed := c.processStream(ctx, requestSummary, resp, forCall) // If the connection was closed by the client, we're done. @@ -1795,6 +1794,23 @@ func (c *streamableClientConn) handleSSE(ctx context.Context, requestSummary str return } + // Check if we made progress (lastEventID advanced). + // Only reset the retry counter when actual progress is made. + if lastEventID != "" && lastEventID != prevLastEventID { + // Progress was made: reset the retry counter. + retriesWithoutProgress = 0 + prevLastEventID = lastEventID + } else { + // No progress: increment the retry counter. + retriesWithoutProgress++ + if retriesWithoutProgress > c.maxRetries { + if ctx.Err() == nil { + c.fail(fmt.Errorf("%s: exceeded %d retries without progress (session ID: %v)", requestSummary, c.maxRetries, c.sessionID)) + } + return + } + } + // The stream was interrupted or ended by the server. Attempt to reconnect. newResp, err := c.connectSSE(ctx, lastEventID, reconnectDelay, false) if err != nil { diff --git a/mcp/streamable_client_test.go b/mcp/streamable_client_test.go index e2923325..586409bd 100644 --- a/mcp/streamable_client_test.go +++ b/mcp/streamable_client_test.go @@ -693,3 +693,98 @@ func TestStreamableClientTransientErrors(t *testing.T) { }) } } + +// TestStreamableClientRetryWithoutProgress verifies that the client fails after +// exceeding the retry limit when no progress is made (Last-Event-ID does not advance). +// This tests the fix for issue #679. +func TestStreamableClientRetryWithoutProgress(t *testing.T) { + // Speed up reconnection delays for testing. + const tick = 10 * time.Millisecond + defer func(delay time.Duration) { + reconnectInitialDelay = delay + }(reconnectInitialDelay) + reconnectInitialDelay = tick + + // Use the fakeStreamableServer pattern like other tests to avoid race conditions. + ctx := context.Background() + const maxRetries = 2 + var retryCount atomic.Int32 + + fake := &fakeStreamableServer{ + t: t, + responses: fakeResponses{ + {"POST", "", methodInitialize, ""}: { + header: header{ + "Content-Type": "application/json", + sessionIDHeader: "test-session", + }, + body: jsonBody(t, initResp), + }, + {"POST", "test-session", notificationInitialized, ""}: { + status: http.StatusAccepted, + wantProtocolVersion: latestProtocolVersion, + }, + {"GET", "test-session", "", ""}: { + // Disable standalone SSE stream to simplify the test. + status: http.StatusMethodNotAllowed, + }, + {"POST", "test-session", methodCallTool, ""}: { + header: header{ + "Content-Type": "text/event-stream", + }, + // Return SSE stream with fixed event ID. + body: `id: fixed_1 +data: {"jsonrpc":"2.0","method":"notifications/message","params":{"level":"info","data":"test"}} + +`, + }, + // Resumption attempts with the same event ID (no progress). + {"GET", "test-session", "", "fixed_1"}: { + header: header{ + "Content-Type": "text/event-stream", + }, + responseFunc: func(r *jsonrpc.Request) (string, int) { + retryCount.Add(1) + // Return the same event ID - no progress. + return `id: fixed_1 +data: {"jsonrpc":"2.0","method":"notifications/message","params":{"level":"info","data":"retry"}} + +`, http.StatusOK + }, + }, + {"DELETE", "test-session", "", ""}: {optional: true}, + }, + } + + httpServer := httptest.NewServer(fake) + defer httpServer.Close() + + transport := &StreamableClientTransport{ + Endpoint: httpServer.URL, + MaxRetries: maxRetries, + } + client := NewClient(testImpl, nil) + session, err := client.Connect(ctx, transport, nil) + if err != nil { + t.Fatalf("Connect failed: %v", err) + } + defer session.Close() + + // Make a call that will trigger reconnections without progress. + _, err = session.CallTool(ctx, &CallToolParams{Name: "test"}) + if err == nil { + t.Fatal("CallTool succeeded unexpectedly, want error due to exceeded retries") + } + + // Check that the error mentions exceeding retries without progress. + wantErr := "exceeded" + if !strings.Contains(err.Error(), wantErr) { + t.Errorf("CallTool error = %q, want containing %q", err.Error(), wantErr) + } + + // Verify that we actually retried the expected number of times. + // We expect maxRetries+1 attempts because we increment before checking the limit. + if got := retryCount.Load(); got != int32(maxRetries+1) { + t.Errorf("retry count = %d, want exactly %d", got, maxRetries+1) + } +}