Skip to content

Commit 7adbf9d

Browse files
committed
Resumability + redelivery + polling via server-side disconnect
1 parent ae17ba0 commit 7adbf9d

File tree

15 files changed

+1512
-57
lines changed

15 files changed

+1512
-57
lines changed

src/ModelContextProtocol.AspNetCore/HttpServerTransportOptions.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,34 @@ public class HttpServerTransportOptions
4343
/// </remarks>
4444
public bool Stateless { get; set; }
4545

46+
/// <summary>
47+
/// Gets or sets the event store for resumability support.
48+
/// When set, events are stored and can be replayed when clients reconnect with a Last-Event-ID header.
49+
/// </summary>
50+
/// <remarks>
51+
/// When configured, the server will:
52+
/// <list type="bullet">
53+
/// <item><description>Generate unique event IDs for each SSE message</description></item>
54+
/// <item><description>Store events for later replay</description></item>
55+
/// <item><description>Replay missed events when a client reconnects with a Last-Event-ID header</description></item>
56+
/// <item><description>Send priming events to establish resumability before any actual messages</description></item>
57+
/// </list>
58+
/// </remarks>
59+
public IEventStore? EventStore { get; set; }
60+
61+
/// <summary>
62+
/// Gets or sets the retry interval to suggest to clients in SSE retry field.
63+
/// </summary>
64+
/// <value>
65+
/// The retry interval. The default is <see langword="null"/>, meaning no retry field is sent.
66+
/// </value>
67+
/// <remarks>
68+
/// When set along with <see cref="EventStore"/>, the server will include a retry field in priming events.
69+
/// This suggests to clients how long to wait before attempting to reconnect after a connection is lost.
70+
/// Clients may use this value to implement polling behavior during long-running operations.
71+
/// </remarks>
72+
public TimeSpan? RetryInterval { get; set; }
73+
4674
/// <summary>
4775
/// Gets or sets a value that indicates whether the server uses a single execution context for the entire session.
4876
/// </summary>

src/ModelContextProtocol.AspNetCore/StreamableHttpHandler.cs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ internal sealed class StreamableHttpHandler(
2323
ILoggerFactory loggerFactory)
2424
{
2525
private const string McpSessionIdHeaderName = "Mcp-Session-Id";
26+
private const string LastEventIdHeaderName = "Last-Event-ID";
2627

2728
private static readonly JsonTypeInfo<JsonRpcMessage> s_messageTypeInfo = GetRequiredJsonTypeInfo<JsonRpcMessage>();
2829
private static readonly JsonTypeInfo<JsonRpcError> s_errorTypeInfo = GetRequiredJsonTypeInfo<JsonRpcError>();
@@ -88,10 +89,15 @@ await WriteJsonRpcErrorAsync(context,
8889
return;
8990
}
9091

91-
if (!session.TryStartGetRequest())
92+
// Check for Last-Event-ID header for resumability
93+
var lastEventId = context.Request.Headers[LastEventIdHeaderName].ToString();
94+
var isResumption = !string.IsNullOrEmpty(lastEventId);
95+
96+
// Only check TryStartGetRequest for new connections, not resumptions
97+
if (!isResumption && !session.TryStartGetRequest())
9298
{
9399
await WriteJsonRpcErrorAsync(context,
94-
"Bad Request: This server does not support multiple GET requests. Start a new session to get a new GET SSE response.",
100+
"Bad Request: This server does not support multiple GET requests. Use Last-Event-ID header to resume or start a new session.",
95101
StatusCodes.Status400BadRequest);
96102
return;
97103
}
@@ -111,7 +117,7 @@ await WriteJsonRpcErrorAsync(context,
111117
// will be sent in response to a different POST request. It might be a while before we send a message
112118
// over this response body.
113119
await context.Response.Body.FlushAsync(cancellationToken);
114-
await session.Transport.HandleGetRequestAsync(context.Response.Body, cancellationToken);
120+
await session.Transport.HandleGetRequestAsync(context.Response.Body, isResumption ? lastEventId : null, cancellationToken);
115121
}
116122
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
117123
{
@@ -194,7 +200,10 @@ private async ValueTask<StreamableHttpSession> StartNewSessionAsync(HttpContext
194200
{
195201
SessionId = sessionId,
196202
FlowExecutionContextFromRequests = !HttpServerTransportOptions.PerSessionExecutionContext,
203+
EventStore = HttpServerTransportOptions.EventStore,
204+
RetryInterval = HttpServerTransportOptions.RetryInterval,
197205
};
206+
198207
context.Response.Headers[McpSessionIdHeaderName] = sessionId;
199208
}
200209
else

src/ModelContextProtocol.Core/Client/HttpClientTransportOptions.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,4 +106,17 @@ public required Uri Endpoint
106106
/// Gets sor sets the authorization provider to use for authentication.
107107
/// </summary>
108108
public ClientOAuthOptions? OAuth { get; set; }
109+
110+
/// <summary>
111+
/// Gets or sets the maximum number of reconnection attempts when an SSE stream is disconnected.
112+
/// </summary>
113+
/// <value>
114+
/// The maximum number of reconnection attempts. The default is 2.
115+
/// </value>
116+
/// <remarks>
117+
/// When an SSE stream is disconnected (e.g., due to a network issue), the client will attempt to
118+
/// reconnect using the Last-Event-ID header to resume from where it left off. This property controls
119+
/// how many reconnection attempts are made before giving up.
120+
/// </remarks>
121+
public int MaxReconnectionAttempts { get; set; } = 2;
109122
}

src/ModelContextProtocol.Core/Client/StreamableHttpClientSessionTransport.cs

Lines changed: 142 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ internal sealed partial class StreamableHttpClientSessionTransport : TransportBa
1616
private static readonly MediaTypeWithQualityHeaderValue s_applicationJsonMediaType = new("application/json");
1717
private static readonly MediaTypeWithQualityHeaderValue s_textEventStreamMediaType = new("text/event-stream");
1818

19+
private static readonly TimeSpan s_defaultReconnectionDelay = TimeSpan.FromSeconds(1);
20+
1921
private readonly McpHttpClient _httpClient;
2022
private readonly HttpClientTransportOptions _options;
2123
private readonly CancellationTokenSource _connectionCts;
@@ -106,7 +108,17 @@ internal async Task<HttpResponseMessage> SendHttpRequestAsync(JsonRpcMessage mes
106108
else if (response.Content.Headers.ContentType?.MediaType == "text/event-stream")
107109
{
108110
using var responseBodyStream = await response.Content.ReadAsStreamAsync(cancellationToken);
109-
rpcResponseOrError = await ProcessSseResponseAsync(responseBodyStream, rpcRequest, cancellationToken).ConfigureAwait(false);
111+
var sseState = await ProcessSseResponseAsync(responseBodyStream, rpcRequest, cancellationToken).ConfigureAwait(false);
112+
rpcResponseOrError = sseState.Response;
113+
114+
// Resumability: If POST SSE stream ended without a response but we have a Last-Event-ID (from priming),
115+
// attempt to resume by sending a GET request with Last-Event-ID header. The server will replay
116+
// events from the event store, allowing us to receive the pending response.
117+
if (rpcResponseOrError is null && rpcRequest is not null && sseState.LastEventId is not null)
118+
{
119+
var resumeResult = await SendGetSseRequestWithRetriesAsync(rpcRequest, sseState, cancellationToken).ConfigureAwait(false);
120+
rpcResponseOrError = resumeResult.Response;
121+
}
110122
}
111123

112124
if (rpcRequest is null)
@@ -188,54 +200,135 @@ public override async ValueTask DisposeAsync()
188200

189201
private async Task ReceiveUnsolicitedMessagesAsync()
190202
{
191-
// Send a GET request to handle any unsolicited messages not sent over a POST response.
192-
using var request = new HttpRequestMessage(HttpMethod.Get, _options.Endpoint);
193-
request.Headers.Accept.Add(s_textEventStreamMediaType);
194-
CopyAdditionalHeaders(request.Headers, _options.AdditionalHeaders, SessionId, _negotiatedProtocolVersion);
203+
var state = new SseStreamState();
195204

196-
// Server support for the GET request is optional. If it fails, we don't care. It just means we won't receive unsolicited messages.
197-
HttpResponseMessage response;
198-
try
199-
{
200-
response = await _httpClient.SendAsync(request, message: null, _connectionCts.Token).ConfigureAwait(false);
201-
}
202-
catch (HttpRequestException)
205+
// Continuously receive unsolicited messages until cancelled
206+
while (!_connectionCts.Token.IsCancellationRequested)
203207
{
204-
return;
208+
var result = await SendGetSseRequestWithRetriesAsync(
209+
relatedRpcRequest: null,
210+
state,
211+
_connectionCts.Token).ConfigureAwait(false);
212+
213+
// Update state for next reconnection attempt
214+
state.UpdateFrom(result);
215+
216+
// If we exhausted retries without receiving any events, stop trying
217+
if (result.LastEventId is null)
218+
{
219+
return;
220+
}
205221
}
222+
}
223+
224+
/// <summary>
225+
/// Sends a GET request for SSE with retry logic and resumability support.
226+
/// </summary>
227+
private async Task<SseStreamState> SendGetSseRequestWithRetriesAsync(
228+
JsonRpcRequest? relatedRpcRequest,
229+
SseStreamState state,
230+
CancellationToken cancellationToken)
231+
{
232+
int attempt = 0;
233+
234+
// Delay before first attempt if we're reconnecting (have a Last-Event-ID)
235+
bool shouldDelay = state.LastEventId is not null;
206236

207-
using (response)
237+
while (attempt < _options.MaxReconnectionAttempts)
208238
{
209-
if (!response.IsSuccessStatusCode)
239+
cancellationToken.ThrowIfCancellationRequested();
240+
241+
if (shouldDelay)
210242
{
211-
return;
243+
var delay = state.RetryInterval ?? s_defaultReconnectionDelay;
244+
await Task.Delay(delay, cancellationToken).ConfigureAwait(false);
212245
}
246+
shouldDelay = true;
247+
248+
using var request = new HttpRequestMessage(HttpMethod.Get, _options.Endpoint);
249+
request.Headers.Accept.Add(s_textEventStreamMediaType);
250+
CopyAdditionalHeaders(request.Headers, _options.AdditionalHeaders, SessionId, _negotiatedProtocolVersion, state.LastEventId);
251+
252+
HttpResponseMessage response;
253+
try
254+
{
255+
response = await _httpClient.SendAsync(request, message: null, cancellationToken).ConfigureAwait(false);
256+
}
257+
catch (HttpRequestException)
258+
{
259+
attempt++;
260+
continue;
261+
}
262+
263+
using (response)
264+
{
265+
if (!response.IsSuccessStatusCode)
266+
{
267+
attempt++;
268+
continue;
269+
}
270+
271+
using var responseStream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);
272+
var result = await ProcessSseResponseAsync(responseStream, relatedRpcRequest, cancellationToken).ConfigureAwait(false);
273+
274+
state.UpdateFrom(result);
275+
276+
if (result.Response is not null)
277+
{
278+
return state;
279+
}
213280

214-
using var responseStream = await response.Content.ReadAsStreamAsync(_connectionCts.Token).ConfigureAwait(false);
215-
await ProcessSseResponseAsync(responseStream, relatedRpcRequest: null, _connectionCts.Token).ConfigureAwait(false);
281+
// Stream closed without the response
282+
if (state.LastEventId is null)
283+
{
284+
// No event ID means server may not support resumability - don't retry indefinitely
285+
attempt++;
286+
}
287+
else
288+
{
289+
// We have an event ID, so reconnection should work - reset attempts
290+
attempt = 0;
291+
}
292+
}
216293
}
294+
295+
return state;
217296
}
218297

219-
private async Task<JsonRpcMessageWithId?> ProcessSseResponseAsync(Stream responseStream, JsonRpcRequest? relatedRpcRequest, CancellationToken cancellationToken)
298+
private async Task<SseStreamState> ProcessSseResponseAsync(
299+
Stream responseStream,
300+
JsonRpcRequest? relatedRpcRequest,
301+
CancellationToken cancellationToken)
220302
{
303+
var state = new SseStreamState();
304+
221305
await foreach (SseItem<string> sseEvent in SseParser.Create(responseStream).EnumerateAsync(cancellationToken).ConfigureAwait(false))
222306
{
223-
if (sseEvent.EventType != "message")
307+
// Track event ID and retry interval for resumability
308+
if (!string.IsNullOrEmpty(sseEvent.EventId))
309+
{
310+
state.LastEventId = sseEvent.EventId;
311+
}
312+
if (sseEvent.ReconnectionInterval.HasValue)
313+
{
314+
state.RetryInterval = sseEvent.ReconnectionInterval.Value;
315+
}
316+
317+
// Skip events with empty data (priming events, keep-alives)
318+
if (string.IsNullOrEmpty(sseEvent.Data) || sseEvent.EventType != "message")
224319
{
225320
continue;
226321
}
227322

228323
var rpcResponseOrError = await ProcessMessageAsync(sseEvent.Data, relatedRpcRequest, cancellationToken).ConfigureAwait(false);
229-
230-
// The server SHOULD end the HTTP response body here anyway, but we won't leave it to chance. This transport makes
231-
// a GET request for any notifications that might need to be sent after the completion of each POST.
232324
if (rpcResponseOrError is not null)
233325
{
234-
return rpcResponseOrError;
326+
state.Response = rpcResponseOrError;
327+
return state;
235328
}
236329
}
237330

238-
return null;
331+
return state;
239332
}
240333

241334
private async Task<JsonRpcMessageWithId?> ProcessMessageAsync(string data, JsonRpcRequest? relatedRpcRequest, CancellationToken cancellationToken)
@@ -292,7 +385,8 @@ internal static void CopyAdditionalHeaders(
292385
HttpRequestHeaders headers,
293386
IDictionary<string, string>? additionalHeaders,
294387
string? sessionId,
295-
string? protocolVersion)
388+
string? protocolVersion,
389+
string? lastEventId = null)
296390
{
297391
if (sessionId is not null)
298392
{
@@ -304,6 +398,11 @@ internal static void CopyAdditionalHeaders(
304398
headers.Add("MCP-Protocol-Version", protocolVersion);
305399
}
306400

401+
if (lastEventId is not null)
402+
{
403+
headers.Add("Last-Event-ID", lastEventId);
404+
}
405+
307406
if (additionalHeaders is null)
308407
{
309408
return;
@@ -317,4 +416,21 @@ internal static void CopyAdditionalHeaders(
317416
}
318417
}
319418
}
419+
420+
/// <summary>
421+
/// Tracks state across SSE stream connections.
422+
/// </summary>
423+
private struct SseStreamState
424+
{
425+
public JsonRpcMessageWithId? Response;
426+
public string? LastEventId;
427+
public TimeSpan? RetryInterval;
428+
429+
public void UpdateFrom(SseStreamState other)
430+
{
431+
Response ??= other.Response;
432+
LastEventId ??= other.LastEventId;
433+
RetryInterval ??= other.RetryInterval;
434+
}
435+
}
320436
}

src/ModelContextProtocol.Core/McpSessionHandler.cs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,32 @@ internal sealed partial class McpSessionHandler : IAsyncDisposable
2929
"mcp.server.operation.duration", "Measures the duration of inbound message processing.", longBuckets: false);
3030

3131
/// <summary>The latest version of the protocol supported by this implementation.</summary>
32-
internal const string LatestProtocolVersion = "2025-06-18";
32+
internal const string LatestProtocolVersion = "2025-11-25";
3333

3434
/// <summary>All protocol versions supported by this implementation.</summary>
3535
internal static readonly string[] SupportedProtocolVersions =
3636
[
3737
"2024-11-05",
3838
"2025-03-26",
39+
"2025-06-18",
3940
LatestProtocolVersion,
4041
];
4142

43+
/// <summary>
44+
/// Checks if the given protocol version supports resumability (priming events).
45+
/// </summary>
46+
/// <param name="protocolVersion">The protocol version to check.</param>
47+
/// <returns>True if the protocol version supports resumability.</returns>
48+
/// <remarks>
49+
/// Priming events are only supported in protocol version &gt;= 2025-11-25.
50+
/// Older clients may crash when receiving SSE events with empty data.
51+
/// </remarks>
52+
internal static bool SupportsResumability(string? protocolVersion)
53+
{
54+
const string MinResumabilityProtocolVersion = "2025-11-25";
55+
return string.Compare(protocolVersion, MinResumabilityProtocolVersion, StringComparison.Ordinal) >= 0;
56+
}
57+
4258
private readonly bool _isServer;
4359
private readonly string _transportKind;
4460
private readonly ITransport _transport;

0 commit comments

Comments
 (0)