Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions mcp/streamable.go
Original file line number Diff line number Diff line change
Expand Up @@ -1773,15 +1773,14 @@ func (c *streamableClientConn) handleJSON(requestSummary string, resp *http.Resp
// stream is complete when we receive its response. Otherwise, this is the
// standalone stream.
func (c *streamableClientConn) handleSSE(ctx context.Context, requestSummary string, resp *http.Response, forCall *jsonrpc2.Request) {
// Track the last event ID to detect progress.
// The retry counter is only reset when progress is made (lastEventID advances).
// This prevents infinite retry loops when a server repeatedly terminates
// connections without making progress (#679).
var prevLastEventID string
retriesWithoutProgress := 0

for {
// Connection was successful. Continue the loop with the new response.
//
// TODO(#679): we should set a reasonable limit on the number of times
// we'll try getting a response for a given request, or enforce that we
// actually make progress.
//
// Eventually, if we don't get the response, we should stop trying and
// fail the request.
lastEventID, reconnectDelay, clientClosed := c.processStream(ctx, requestSummary, resp, forCall)

// If the connection was closed by the client, we're done.
Expand All @@ -1795,6 +1794,23 @@ func (c *streamableClientConn) handleSSE(ctx context.Context, requestSummary str
return
}

// Check if we made progress (lastEventID advanced).
// Only reset the retry counter when actual progress is made.
if lastEventID != "" && lastEventID != prevLastEventID {
// Progress was made: reset the retry counter.
retriesWithoutProgress = 0
prevLastEventID = lastEventID
} else {
// No progress: increment the retry counter.
retriesWithoutProgress++
if retriesWithoutProgress > c.maxRetries {
if ctx.Err() == nil {
c.fail(fmt.Errorf("%s: exceeded %d retries without progress (session ID: %v)", requestSummary, c.maxRetries, c.sessionID))
}
return
}
}

// The stream was interrupted or ended by the server. Attempt to reconnect.
newResp, err := c.connectSSE(ctx, lastEventID, reconnectDelay, false)
if err != nil {
Expand Down
95 changes: 95 additions & 0 deletions mcp/streamable_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,3 +693,98 @@ func TestStreamableClientTransientErrors(t *testing.T) {
})
}
}

// TestStreamableClientRetryWithoutProgress verifies that the client fails after
// exceeding the retry limit when no progress is made (Last-Event-ID does not advance).
// This tests the fix for issue #679.
func TestStreamableClientRetryWithoutProgress(t *testing.T) {
// Speed up reconnection delays for testing.
const tick = 10 * time.Millisecond
defer func(delay time.Duration) {
reconnectInitialDelay = delay
}(reconnectInitialDelay)
reconnectInitialDelay = tick

// Use the fakeStreamableServer pattern like other tests to avoid race conditions.
ctx := context.Background()
maxRetries := 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const maxRetries = 2

(more readable, and avoids some conversions below)

var retryCount atomic.Int32

fake := &fakeStreamableServer{
t: t,
responses: fakeResponses{
{"POST", "", methodInitialize, ""}: {
header: header{
"Content-Type": "application/json",
sessionIDHeader: "test-session",
},
body: jsonBody(t, initResp),
},
{"POST", "test-session", notificationInitialized, ""}: {
status: http.StatusAccepted,
wantProtocolVersion: latestProtocolVersion,
},
{"GET", "test-session", "", ""}: {
// Disable standalone SSE stream to simplify the test.
status: http.StatusMethodNotAllowed,
},
{"POST", "test-session", methodCallTool, ""}: {
header: header{
"Content-Type": "text/event-stream",
},
// Return SSE stream with fixed event ID.
body: `id: fixed_1
data: {"jsonrpc":"2.0","method":"notifications/message","params":{"level":"info","data":"test"}}

`,
},
// Resumption attempts with the same event ID (no progress).
{"GET", "test-session", "", "fixed_1"}: {
header: header{
"Content-Type": "text/event-stream",
},
responseFunc: func(r *jsonrpc.Request) (string, int) {
retryCount.Add(1)
// Return the same event ID - no progress.
return `id: fixed_1
data: {"jsonrpc":"2.0","method":"notifications/message","params":{"level":"info","data":"retry"}}

`, http.StatusOK
},
},
{"DELETE", "test-session", "", ""}: {optional: true},
},
}

httpServer := httptest.NewServer(fake)
defer httpServer.Close()

transport := &StreamableClientTransport{
Endpoint: httpServer.URL,
MaxRetries: maxRetries,
}
client := NewClient(testImpl, nil)
session, err := client.Connect(ctx, transport, nil)
if err != nil {
t.Fatalf("Connect failed: %v", err)
}
defer session.Close()

// Make a call that will trigger reconnections without progress.
_, err = session.CallTool(ctx, &CallToolParams{Name: "test"})
if err == nil {
t.Fatal("CallTool succeeded unexpectedly, want error due to exceeded retries")
}

// Check that the error mentions exceeding retries without progress.
wantErr := "exceeded"
if !strings.Contains(err.Error(), wantErr) {
t.Errorf("CallTool error = %q, want containing %q", err.Error(), wantErr)
}

// Verify that we actually retried the expected number of times.
// We expect maxRetries+1 attempts because we increment before checking the limit.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, make this more assertive: assert that got == maxRetries+1

if got := retryCount.Load(); got < int32(maxRetries) {
t.Errorf("retry count = %d, want at least %d", got, maxRetries)
}
}