Skip to content

Commit 63fb022

Browse files
committed
Server-side resumability via Last-Event-ID
1 parent 677d9ea commit 63fb022

File tree

10 files changed

+411
-76
lines changed

10 files changed

+411
-76
lines changed

src/ModelContextProtocol.AspNetCore/HttpServerTransportOptions.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,34 @@ public class HttpServerTransportOptions
4343
/// </remarks>
4444
public bool Stateless { get; set; }
4545

46+
/// <summary>
47+
/// Gets or sets the event store for resumability support.
48+
/// When set, events are stored and can be replayed when clients reconnect with a Last-Event-ID header.
49+
/// </summary>
50+
/// <remarks>
51+
/// When configured, the server will:
52+
/// <list type="bullet">
53+
/// <item><description>Generate unique event IDs for each SSE message</description></item>
54+
/// <item><description>Store events for later replay</description></item>
55+
/// <item><description>Replay missed events when a client reconnects with a Last-Event-ID header</description></item>
56+
/// <item><description>Send priming events to establish resumability before any actual messages</description></item>
57+
/// </list>
58+
/// </remarks>
59+
public ISseEventStreamStore? EventStreamStore { get; set; }
60+
61+
/// <summary>
62+
/// Gets or sets the retry interval to suggest to clients in SSE retry field.
63+
/// </summary>
64+
/// <value>
65+
/// The retry interval. The default is 1 second.
66+
/// </value>
67+
/// <remarks>
68+
/// When <see cref="EventStreamStore"/> is set, the server will include a retry field in priming events.
69+
/// This value suggests to clients how long to wait before attempting to reconnect after a connection is lost.
70+
/// Clients may use this value to implement polling behavior during long-running operations.
71+
/// </remarks>
72+
public TimeSpan RetryInterval { get; set; } = TimeSpan.FromSeconds(1);
73+
4674
/// <summary>
4775
/// Gets or sets a value that indicates whether the server uses a single execution context for the entire session.
4876
/// </summary>

src/ModelContextProtocol.AspNetCore/McpEndpointRouteBuilderExtensions.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,12 @@ public static IEndpointConventionBuilder MapMcp(this IEndpointRouteBuilder endpo
3838
.WithMetadata(new ProducesResponseTypeMetadata(StatusCodes.Status200OK, contentTypes: ["text/event-stream"]))
3939
.WithMetadata(new ProducesResponseTypeMetadata(StatusCodes.Status202Accepted));
4040

41+
streamableHttpGroup.MapGet("", streamableHttpHandler.HandleGetRequestAsync)
42+
.WithMetadata(new ProducesResponseTypeMetadata(StatusCodes.Status200OK, contentTypes: ["text/event-stream"]));
43+
4144
if (!streamableHttpHandler.HttpServerTransportOptions.Stateless)
4245
{
43-
// The GET and DELETE endpoints are not mapped in Stateless mode since there's no way to send unsolicited messages
44-
// for the GET to handle, and there is no server-side state for the DELETE to clean up.
45-
streamableHttpGroup.MapGet("", streamableHttpHandler.HandleGetRequestAsync)
46-
.WithMetadata(new ProducesResponseTypeMetadata(StatusCodes.Status200OK, contentTypes: ["text/event-stream"]));
46+
// The DELETE endpoints are not mapped in Stateless mode since there's no server-side state to clean up.
4747
streamableHttpGroup.MapDelete("", streamableHttpHandler.HandleDeleteRequestAsync);
4848

4949
// Map legacy HTTP with SSE endpoints only if not in Stateless mode, because we cannot guarantee the /message requests

src/ModelContextProtocol.AspNetCore/StreamableHttpHandler.cs

Lines changed: 101 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ internal sealed class StreamableHttpHandler(
2323
ILoggerFactory loggerFactory)
2424
{
2525
private const string McpSessionIdHeaderName = "Mcp-Session-Id";
26+
private const string LastEventIdHeaderName = "Last-Event-ID";
2627

2728
private static readonly JsonTypeInfo<JsonRpcMessage> s_messageTypeInfo = GetRequiredJsonTypeInfo<JsonRpcMessage>();
2829
private static readonly JsonTypeInfo<JsonRpcError> s_errorTypeInfo = GetRequiredJsonTypeInfo<JsonRpcError>();
@@ -81,17 +82,80 @@ await WriteJsonRpcErrorAsync(context,
8182
return;
8283
}
8384

85+
StreamableHttpSession? session = null;
86+
ISseEventStreamReader? eventStreamReader = null;
87+
8488
var sessionId = context.Request.Headers[McpSessionIdHeaderName].ToString();
85-
var session = await GetSessionAsync(context, sessionId);
89+
var lastEventId = context.Request.Headers[LastEventIdHeaderName].ToString();
90+
91+
if (!string.IsNullOrEmpty(sessionId))
92+
{
93+
session = await GetSessionAsync(context, sessionId);
94+
if (session is null)
95+
{
96+
// There was an error obtaining the session; consider the request failed.
97+
return;
98+
}
99+
}
100+
101+
if (!string.IsNullOrEmpty(lastEventId))
102+
{
103+
if (HttpServerTransportOptions.Stateless)
104+
{
105+
await WriteJsonRpcErrorAsync(context,
106+
"Bad Request: The Last-Event-ID header is not supported in stateless mode.",
107+
StatusCodes.Status400BadRequest);
108+
return;
109+
}
110+
111+
eventStreamReader = await GetEventStreamReaderAsync(context, lastEventId);
112+
if (eventStreamReader is null)
113+
{
114+
// There was an error obtaining the event stream; consider the request failed.
115+
return;
116+
}
117+
}
118+
119+
if (session is not null && eventStreamReader is not null && !string.Equals(session.Id, eventStreamReader.SessionId, StringComparison.Ordinal))
120+
{
121+
await WriteJsonRpcErrorAsync(context,
122+
"Bad Request: The Last-Event-ID header refers to a session with a different session ID.",
123+
StatusCodes.Status400BadRequest);
124+
return;
125+
}
126+
127+
if (eventStreamReader is null || string.Equals(eventStreamReader.StreamId, StreamableHttpServerTransport.UnsolicitedMessageStreamId, StringComparison.Ordinal))
128+
{
129+
await HandleUnsolicitedMessageStreamAsync(context, session, eventStreamReader);
130+
}
131+
else
132+
{
133+
await HandleResumePostResponseStreamAsync(context, eventStreamReader);
134+
}
135+
}
136+
137+
private async Task HandleUnsolicitedMessageStreamAsync(HttpContext context, StreamableHttpSession? session, ISseEventStreamReader? eventStreamReader)
138+
{
139+
if (HttpServerTransportOptions.Stateless)
140+
{
141+
await WriteJsonRpcErrorAsync(context,
142+
"Bad Request: Unsolicited messages are not supported in stateless mode.",
143+
StatusCodes.Status400BadRequest);
144+
return;
145+
}
146+
86147
if (session is null)
87148
{
149+
await WriteJsonRpcErrorAsync(context,
150+
"Bad Request: Mcp-Session-Id header is required",
151+
StatusCodes.Status400BadRequest);
88152
return;
89153
}
90154

91-
if (!session.TryStartGetRequest())
155+
if (eventStreamReader is null && !session.TryStartGetRequest())
92156
{
93157
await WriteJsonRpcErrorAsync(context,
94-
"Bad Request: This server does not support multiple GET requests. Start a new session to get a new GET SSE response.",
158+
"Bad Request: This server does not support multiple GET requests. Use Last-Event-ID header to resume or start a new session.",
95159
StatusCodes.Status400BadRequest);
96160
return;
97161
}
@@ -111,7 +175,7 @@ await WriteJsonRpcErrorAsync(context,
111175
// will be sent in response to a different POST request. It might be a while before we send a message
112176
// over this response body.
113177
await context.Response.Body.FlushAsync(cancellationToken);
114-
await session.Transport.HandleGetRequestAsync(context.Response.Body, cancellationToken);
178+
await session.Transport.HandleGetRequestAsync(context.Response.Body, eventStreamReader, cancellationToken);
115179
}
116180
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
117181
{
@@ -120,6 +184,12 @@ await WriteJsonRpcErrorAsync(context,
120184
}
121185
}
122186

187+
private static async Task HandleResumePostResponseStreamAsync(HttpContext context, ISseEventStreamReader eventStreamReader)
188+
{
189+
InitializeSseResponse(context);
190+
await eventStreamReader.CopyToAsync(context.Response.Body, context.RequestAborted);
191+
}
192+
123193
public async Task HandleDeleteRequestAsync(HttpContext context)
124194
{
125195
var sessionId = context.Request.Headers[McpSessionIdHeaderName].ToString();
@@ -131,14 +201,7 @@ public async Task HandleDeleteRequestAsync(HttpContext context)
131201

132202
private async ValueTask<StreamableHttpSession?> GetSessionAsync(HttpContext context, string sessionId)
133203
{
134-
StreamableHttpSession? session;
135-
136-
if (string.IsNullOrEmpty(sessionId))
137-
{
138-
await WriteJsonRpcErrorAsync(context, "Bad Request: Mcp-Session-Id header is required", StatusCodes.Status400BadRequest);
139-
return null;
140-
}
141-
else if (!sessionManager.TryGetValue(sessionId, out session))
204+
if (!sessionManager.TryGetValue(sessionId, out var session))
142205
{
143206
// -32001 isn't part of the MCP standard, but this is what the typescript-sdk currently does.
144207
// One of the few other usages I found was from some Ethereum JSON-RPC documentation and this
@@ -194,12 +257,16 @@ private async ValueTask<StreamableHttpSession> StartNewSessionAsync(HttpContext
194257
{
195258
SessionId = sessionId,
196259
FlowExecutionContextFromRequests = !HttpServerTransportOptions.PerSessionExecutionContext,
260+
EventStreamStore = HttpServerTransportOptions.EventStreamStore,
261+
RetryInterval = HttpServerTransportOptions.RetryInterval,
197262
};
198263
context.Response.Headers[McpSessionIdHeaderName] = sessionId;
199264
}
200265
else
201266
{
202267
// In stateless mode, each request is independent. Don't set any session ID on the transport.
268+
// If in the future we support resuming stateless requests, we should populate
269+
// the event stream store and retry interval here as well.
203270
sessionId = "";
204271
transport = new()
205272
{
@@ -246,6 +313,28 @@ private async ValueTask<StreamableHttpSession> CreateSessionAsync(
246313
return session;
247314
}
248315

316+
private async ValueTask<ISseEventStreamReader?> GetEventStreamReaderAsync(HttpContext context, string lastEventId)
317+
{
318+
if (HttpServerTransportOptions.EventStreamStore is not { } eventStreamStore)
319+
{
320+
await WriteJsonRpcErrorAsync(context,
321+
"Bad Request: This server does not support resuming streams.",
322+
StatusCodes.Status400BadRequest);
323+
return null;
324+
}
325+
326+
var eventStreamReader = await eventStreamStore.GetStreamReaderAsync(lastEventId, context.RequestAborted);
327+
if (eventStreamReader is null)
328+
{
329+
await WriteJsonRpcErrorAsync(context,
330+
"Bad Request: The specified Last-Event-ID is either invalid or expired.",
331+
StatusCodes.Status400BadRequest);
332+
return null;
333+
}
334+
335+
return eventStreamReader;
336+
}
337+
249338
private static Task WriteJsonRpcErrorAsync(HttpContext context, string errorMessage, int statusCode, int errorCode = -32000)
250339
{
251340
var jsonRpcError = new JsonRpcError

src/ModelContextProtocol.Core/McpSessionHandler.cs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,32 @@ internal sealed partial class McpSessionHandler : IAsyncDisposable
2929
"mcp.server.operation.duration", "Measures the duration of inbound message processing.", longBuckets: false);
3030

3131
/// <summary>The latest version of the protocol supported by this implementation.</summary>
32-
internal const string LatestProtocolVersion = "2025-06-18";
32+
internal const string LatestProtocolVersion = "2025-11-25";
3333

3434
/// <summary>All protocol versions supported by this implementation.</summary>
3535
internal static readonly string[] SupportedProtocolVersions =
3636
[
3737
"2024-11-05",
3838
"2025-03-26",
39+
"2025-06-18",
3940
LatestProtocolVersion,
4041
];
4142

43+
/// <summary>
44+
/// Checks if the given protocol version supports priming events.
45+
/// </summary>
46+
/// <param name="protocolVersion">The protocol version to check.</param>
47+
/// <returns>True if the protocol version supports resumability.</returns>
48+
/// <remarks>
49+
/// Priming events are only supported in protocol version &gt;= 2025-11-25.
50+
/// Older clients may crash when receiving SSE events with empty data.
51+
/// </remarks>
52+
internal static bool SupportsPrimingEvent(string? protocolVersion)
53+
{
54+
const string MinResumabilityProtocolVersion = "2025-11-25";
55+
return string.Compare(protocolVersion, MinResumabilityProtocolVersion, StringComparison.Ordinal) >= 0;
56+
}
57+
4258
private readonly bool _isServer;
4359
private readonly string _transportKind;
4460
private readonly ITransport _transport;

src/ModelContextProtocol.Core/Server/SseResponseStreamTransport.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public async Task SendMessageAsync(JsonRpcMessage message, CancellationToken can
6767
{
6868
Throw.IfNull(message);
6969
// If the underlying writer has been disposed, just drop the message.
70-
await _sseWriter.SendMessageAsync(message, cancellationToken).ConfigureAwait(false);
70+
await _sseWriter.SendMessageAsync(message, eventStreamWriter: null, cancellationToken).ConfigureAwait(false);
7171
}
7272

7373
/// <summary>

src/ModelContextProtocol.Core/Server/SseWriter.cs

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ internal sealed class SseWriter(string? messageEndpoint = null, BoundedChannelOp
2222
private readonly SemaphoreSlim _disposeLock = new(1, 1);
2323
private bool _disposed;
2424

25-
public Func<IAsyncEnumerable<SseItem<JsonRpcMessage?>>, CancellationToken, IAsyncEnumerable<SseItem<JsonRpcMessage?>>>? MessageFilter { get; set; }
26-
2725
public Task WriteAllAsync(Stream sseResponseStream, CancellationToken cancellationToken)
2826
{
2927
Throw.IfNull(sseResponseStream);
@@ -38,30 +36,48 @@ public Task WriteAllAsync(Stream sseResponseStream, CancellationToken cancellati
3836
_writeCancellationToken = cancellationToken;
3937

4038
var messages = _messages.Reader.ReadAllAsync(cancellationToken);
41-
if (MessageFilter is not null)
42-
{
43-
messages = MessageFilter(messages, cancellationToken);
44-
}
45-
4639
_writeTask = SseFormatter.WriteAsync(messages, sseResponseStream, WriteJsonRpcMessageToBuffer, cancellationToken);
4740
return _writeTask;
4841
}
4942

50-
public async Task<bool> SendMessageAsync(JsonRpcMessage message, CancellationToken cancellationToken = default)
43+
public Task<bool> SendPrimingEventAsync(TimeSpan retryInterval, ISseEventStreamWriter eventStreamWriter, CancellationToken cancellationToken = default)
44+
{
45+
// Create a priming event: empty data with an event ID
46+
var primingItem = new SseItem<JsonRpcMessage?>(null, "prime")
47+
{
48+
ReconnectionInterval = retryInterval,
49+
};
50+
51+
return SendMessageAsync(primingItem, eventStreamWriter, cancellationToken);
52+
}
53+
54+
public Task<bool> SendMessageAsync(JsonRpcMessage message, ISseEventStreamWriter? eventStreamWriter, CancellationToken cancellationToken = default)
5155
{
5256
Throw.IfNull(message);
5357

58+
// Emit redundant "event: message" lines for better compatibility with other SDKs.
59+
return SendMessageAsync(new SseItem<JsonRpcMessage?>(message, SseParser.EventTypeDefault), eventStreamWriter, cancellationToken);
60+
}
61+
62+
private async Task<bool> SendMessageAsync(SseItem<JsonRpcMessage?> item, ISseEventStreamWriter? eventStreamWriter, CancellationToken cancellationToken = default)
63+
{
5464
using var _ = await _disposeLock.LockAsync(cancellationToken).ConfigureAwait(false);
5565

66+
if (eventStreamWriter is not null && item.EventId is null)
67+
{
68+
// Store the event first, even if the underlying writer has completed, so that
69+
// messages can still be retrieved from the event store.
70+
item = await eventStreamWriter.WriteEventAsync(item, cancellationToken: cancellationToken).ConfigureAwait(false);
71+
}
72+
5673
if (_disposed)
5774
{
5875
// Don't throw ObjectDisposedException here; just return false to indicate the message wasn't sent.
5976
// The calling transport can determine what to do in this case (drop the message, or fall back to another transport).
6077
return false;
6178
}
6279

63-
// Emit redundant "event: message" lines for better compatibility with other SDKs.
64-
await _messages.Writer.WriteAsync(new SseItem<JsonRpcMessage?>(message, SseParser.EventTypeDefault), cancellationToken).ConfigureAwait(false);
80+
await _messages.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false);
6581
return true;
6682
}
6783

@@ -101,7 +117,10 @@ private void WriteJsonRpcMessageToBuffer(SseItem<JsonRpcMessage?> item, IBufferW
101117
return;
102118
}
103119

104-
JsonSerializer.Serialize(GetUtf8JsonWriter(writer), item.Data, McpJsonUtilities.JsonContext.Default.JsonRpcMessage!);
120+
if (item.Data is not null)
121+
{
122+
JsonSerializer.Serialize(GetUtf8JsonWriter(writer), item.Data, McpJsonUtilities.JsonContext.Default.JsonRpcMessage!);
123+
}
105124
}
106125

107126
private Utf8JsonWriter GetUtf8JsonWriter(IBufferWriter<byte> writer)
@@ -118,3 +137,4 @@ private Utf8JsonWriter GetUtf8JsonWriter(IBufferWriter<byte> writer)
118137
return _jsonWriter;
119138
}
120139
}
140+

0 commit comments

Comments
 (0)