Skip to content

Commit 348fac0

Browse files
committed
[WIP] PR feedback + cleanups
1 parent 7adbf9d commit 348fac0

File tree

14 files changed

+361
-727
lines changed

14 files changed

+361
-727
lines changed

src/ModelContextProtocol.AspNetCore/HttpServerTransportOptions.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,20 +56,20 @@ public class HttpServerTransportOptions
5656
/// <item><description>Send priming events to establish resumability before any actual messages</description></item>
5757
/// </list>
5858
/// </remarks>
59-
public IEventStore? EventStore { get; set; }
59+
public ISseEventStore? EventStore { get; set; }
6060

6161
/// <summary>
6262
/// Gets or sets the retry interval to suggest to clients in SSE retry field.
6363
/// </summary>
6464
/// <value>
65-
/// The retry interval. The default is <see langword="null"/>, meaning no retry field is sent.
65+
/// The retry interval. The default is 5 seconds.
6666
/// </value>
6767
/// <remarks>
68-
/// When set along with <see cref="EventStore"/>, the server will include a retry field in priming events.
69-
/// This suggests to clients how long to wait before attempting to reconnect after a connection is lost.
68+
/// When <see cref="EventStore"/> is set, the server will include a retry field in priming events.
69+
/// This value suggests to clients how long to wait before attempting to reconnect after a connection is lost.
7070
/// Clients may use this value to implement polling behavior during long-running operations.
7171
/// </remarks>
72-
public TimeSpan? RetryInterval { get; set; }
72+
public TimeSpan RetryInterval { get; set; } = TimeSpan.FromSeconds(5);
7373

7474
/// <summary>
7575
/// Gets or sets a value that indicates whether the server uses a single execution context for the entire session.

src/ModelContextProtocol.Core/McpSessionHandler.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,15 @@ internal sealed partial class McpSessionHandler : IAsyncDisposable
4141
];
4242

4343
/// <summary>
44-
/// Checks if the given protocol version supports resumability (priming events).
44+
/// Checks if the given protocol version supports priming events.
4545
/// </summary>
4646
/// <param name="protocolVersion">The protocol version to check.</param>
4747
/// <returns>True if the protocol version supports resumability.</returns>
4848
/// <remarks>
4949
/// Priming events are only supported in protocol version &gt;= 2025-11-25.
5050
/// Older clients may crash when receiving SSE events with empty data.
5151
/// </remarks>
52-
internal static bool SupportsResumability(string? protocolVersion)
52+
internal static bool SupportsPrimingEvent(string? protocolVersion)
5353
{
5454
const string MinResumabilityProtocolVersion = "2025-11-25";
5555
return string.Compare(protocolVersion, MinResumabilityProtocolVersion, StringComparison.Ordinal) >= 0;

src/ModelContextProtocol.Core/Protocol/JsonRpcMessageContext.cs

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
using ModelContextProtocol.Server;
22
using System.Security.Claims;
3-
using System.Text.Json.Serialization;
43

54
namespace ModelContextProtocol.Protocol;
65

@@ -58,45 +57,4 @@ public class JsonRpcMessageContext
5857
/// </para>
5958
/// </remarks>
6059
public ClaimsPrincipal? User { get; set; }
61-
62-
/// <summary>
63-
/// Gets or sets a callback that closes the SSE stream associated with the current JSON-RPC request.
64-
/// </summary>
65-
/// <remarks>
66-
/// <para>
67-
/// This callback implements the SSE polling pattern from SEP-1699. When invoked, it gracefully closes
68-
/// the SSE stream for the current request, signaling to the client that it should reconnect to receive
69-
/// the response. The server must have sent a priming event with an event ID before this callback is invoked.
70-
/// </para>
71-
/// <para>
72-
/// This is useful for long-running operations where the server wants to free resources while the operation
73-
/// is in progress. The client will reconnect with the Last-Event-ID header, and the server will replay
74-
/// any events that were sent after that ID.
75-
/// </para>
76-
/// <para>
77-
/// This callback is only available when using the Streamable HTTP transport with resumability enabled.
78-
/// For other transports, this property will be <see langword="null"/>.
79-
/// </para>
80-
/// </remarks>
81-
public Action? CloseSseStream { get; set; }
82-
83-
/// <summary>
84-
/// Gets or sets a callback that closes the standalone SSE stream for the current session.
85-
/// </summary>
86-
/// <remarks>
87-
/// <para>
88-
/// This callback closes the standalone SSE stream that is used for server-initiated messages
89-
/// (notifications and requests from server to client). Unlike <see cref="CloseSseStream"/>,
90-
/// this affects the session-level SSE stream rather than a request-specific stream.
91-
/// </para>
92-
/// <para>
93-
/// This is useful when the server needs to signal the client to reconnect its standalone SSE stream,
94-
/// for example during server restarts or resource cleanup.
95-
/// </para>
96-
/// <para>
97-
/// This callback is only available when using the Streamable HTTP transport.
98-
/// For other transports, this property will be <see langword="null"/>.
99-
/// </para>
100-
/// </remarks>
101-
public Action? CloseStandaloneSseStream { get; set; }
10260
}

src/ModelContextProtocol.Core/Server/IEventStore.cs renamed to src/ModelContextProtocol.Core/Server/ISseEventStore.cs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,14 @@ namespace ModelContextProtocol.Server;
1919
/// Implementations should be thread-safe, as events may be stored and replayed concurrently.
2020
/// </para>
2121
/// </remarks>
22-
public interface IEventStore
22+
public interface ISseEventStore
2323
{
2424
/// <summary>
2525
/// Stores an event for later retrieval.
2626
/// </summary>
27+
/// <param name="sessionId">
28+
/// The ID of the session, or <c>null</c>.
29+
/// </param>
2730
/// <param name="streamId">
2831
/// The ID of the stream the event belongs to. This is typically the JSON-RPC request ID
2932
/// for POST SSE responses, or a special identifier for the standalone GET SSE stream.
@@ -34,23 +37,18 @@ public interface IEventStore
3437
/// </param>
3538
/// <param name="cancellationToken">A token to cancel the operation.</param>
3639
/// <returns>The generated event ID for the stored event.</returns>
37-
ValueTask<string> StoreEventAsync(string streamId, JsonRpcMessage? message, CancellationToken cancellationToken = default);
40+
ValueTask<string> StoreEventAsync(string sessionId, string streamId, JsonRpcMessage? message, CancellationToken cancellationToken = default);
3841

3942
/// <summary>
4043
/// Replays events that occurred after the specified event ID.
4144
/// </summary>
4245
/// <param name="lastEventId">The ID of the last event the client received.</param>
43-
/// <param name="sendCallback">
44-
/// A callback function to send each replayed event to the client.
45-
/// The callback receives the message and its event ID.
46-
/// </param>
4746
/// <param name="cancellationToken">A token to cancel the operation.</param>
4847
/// <returns>
49-
/// The stream ID of the replayed events if the event ID was found and events were replayed;
48+
/// An <see cref="SseReplayResult"/> containing the events to replay if the event ID was found;
5049
/// <see langword="null"/> if the event ID was not found in the store.
5150
/// </returns>
52-
ValueTask<string?> ReplayEventsAfterAsync(
51+
ValueTask<SseReplayResult?> GetEventsAfterAsync(
5352
string lastEventId,
54-
Func<JsonRpcMessage, string, CancellationToken, ValueTask> sendCallback,
5553
CancellationToken cancellationToken = default);
5654
}

src/ModelContextProtocol.Core/Server/RequestContext.cs

Lines changed: 0 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -81,52 +81,4 @@ public McpServer Server
8181
/// including the method name, parameters, request ID, and associated transport and user information.
8282
/// </remarks>
8383
public JsonRpcRequest JsonRpcRequest { get; }
84-
85-
/// <summary>
86-
/// Closes the SSE stream for the current request, signaling to the client that it should reconnect.
87-
/// </summary>
88-
/// <remarks>
89-
/// <para>
90-
/// This method implements the SSE polling pattern from SEP-1699. When called, it gracefully closes
91-
/// the SSE stream for the current request. For this to work correctly, the server must have sent
92-
/// a priming event with an event ID before this method is called (which happens automatically when
93-
/// resumability is enabled via <see cref="StreamableHttpServerTransport.EventStore"/>).
94-
/// </para>
95-
/// <para>
96-
/// After calling this method, the client will receive a stream end and should reconnect with the
97-
/// Last-Event-ID header. The server will then replay any events that were sent after that ID
98-
/// and continue streaming new events.
99-
/// </para>
100-
/// <para>
101-
/// This method only has an effect when using the Streamable HTTP transport with resumability enabled.
102-
/// For other transports or when resumability is not configured, this method does nothing.
103-
/// </para>
104-
/// </remarks>
105-
public void CloseSseStream()
106-
{
107-
JsonRpcRequest.Context?.CloseSseStream?.Invoke();
108-
}
109-
110-
/// <summary>
111-
/// Closes the standalone SSE stream for server-initiated messages.
112-
/// </summary>
113-
/// <remarks>
114-
/// <para>
115-
/// This method closes the standalone SSE stream that is used for server-initiated messages
116-
/// (notifications and requests from server to client). Unlike <see cref="CloseSseStream"/>,
117-
/// this affects the session-level SSE stream (from GET requests) rather than a request-specific stream.
118-
/// </para>
119-
/// <para>
120-
/// This is useful when the server needs to signal the client to reconnect its standalone SSE stream,
121-
/// for example during server restarts or resource cleanup.
122-
/// </para>
123-
/// <para>
124-
/// This method only has an effect when using the Streamable HTTP transport.
125-
/// For other transports, this method does nothing.
126-
/// </para>
127-
/// </remarks>
128-
public void CloseStandaloneSseStream()
129-
{
130-
JsonRpcRequest.Context?.CloseStandaloneSseStream?.Invoke();
131-
}
13284
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
using ModelContextProtocol.Protocol;
2+
3+
namespace ModelContextProtocol.Server;
4+
5+
/// <summary>
6+
/// Represents the result of replaying SSE events to a client during resumption.
7+
/// </summary>
8+
/// <remarks>
9+
/// This class is returned by <see cref="ISseEventStore.GetEventsAfterAsync"/> when a client
10+
/// reconnects with a <c>Last-Event-ID</c> header. It contains the stream and session identifiers
11+
/// along with an async enumerable of events to replay.
12+
/// </remarks>
13+
public sealed class SseReplayResult
14+
{
15+
/// <summary>
16+
/// Gets the session ID that the events belong to.
17+
/// </summary>
18+
public required string SessionId { get; init; }
19+
20+
/// <summary>
21+
/// Gets the stream ID that the events belong to.
22+
/// </summary>
23+
/// <remarks>
24+
/// This is typically the JSON-RPC request ID for POST SSE responses,
25+
/// or a special identifier for the standalone GET SSE stream.
26+
/// </remarks>
27+
public required string StreamId { get; init; }
28+
29+
/// <summary>
30+
/// Gets the async enumerable of events to replay to the client.
31+
/// </summary>
32+
public required IAsyncEnumerable<StoredSseEvent> Events { get; init; }
33+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
using ModelContextProtocol.Protocol;
2+
3+
namespace ModelContextProtocol.Server;
4+
5+
/// <summary>
6+
/// Wraps an <see cref="ISseEventStore"/> with session and stream context for a specific SSE stream.
7+
/// </summary>
8+
/// <remarks>
9+
/// This class simplifies event storage by binding the session ID, stream ID, and retry interval
10+
/// so that callers only need to provide the message when storing events.
11+
/// </remarks>
12+
internal sealed class SseStreamEventStore
13+
{
14+
private readonly ISseEventStore _eventStore;
15+
private readonly string _sessionId;
16+
private readonly string _streamId;
17+
private readonly TimeSpan _retryInterval;
18+
19+
/// <summary>
20+
/// Gets the retry interval to suggest to clients in SSE retry field.
21+
/// </summary>
22+
public TimeSpan RetryInterval => _retryInterval;
23+
24+
/// <summary>
25+
/// Initializes a new instance of the <see cref="SseStreamEventStore"/> class.
26+
/// </summary>
27+
/// <param name="eventStore">The underlying event store to use for storage.</param>
28+
/// <param name="sessionId">The session ID, or <see langword="null"/> to generate a new one.</param>
29+
/// <param name="streamId">The stream ID for this SSE stream.</param>
30+
/// <param name="retryInterval">The retry interval to suggest to clients.</param>
31+
public SseStreamEventStore(ISseEventStore eventStore, string? sessionId, string streamId, TimeSpan retryInterval)
32+
{
33+
_eventStore = eventStore;
34+
_sessionId = sessionId ?? Guid.NewGuid().ToString("N");
35+
_streamId = streamId;
36+
_retryInterval = retryInterval;
37+
}
38+
39+
/// <summary>
40+
/// Stores an event in the underlying event store with the bound session and stream context.
41+
/// </summary>
42+
/// <param name="message">The JSON-RPC message to store, or <see langword="null"/> for priming events.</param>
43+
/// <param name="cancellationToken">A token to cancel the operation.</param>
44+
/// <returns>The generated event ID for the stored event.</returns>
45+
public ValueTask<string> StoreEventAsync(JsonRpcMessage? message, CancellationToken cancellationToken = default)
46+
=> _eventStore.StoreEventAsync(_sessionId, _streamId, message, cancellationToken);
47+
}

0 commit comments

Comments
 (0)