Skip to content

Commit 677d9ea

Browse files
committed
ISseEventStreamStore and test implementation
1 parent 1e407ce commit 677d9ea

File tree

8 files changed

+1077
-0
lines changed

8 files changed

+1077
-0
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
using ModelContextProtocol.Protocol;
2+
using System.Net.ServerSentEvents;
3+
4+
namespace ModelContextProtocol.Server;
5+
6+
/// <summary>
7+
/// Provides read access to an SSE event stream, allowing events to be consumed asynchronously.
8+
/// </summary>
9+
public interface ISseEventStreamReader
10+
{
11+
/// <summary>
12+
/// Gets the session ID associated with the stream being read.
13+
/// </summary>
14+
string SessionId { get; }
15+
16+
/// <summary>
17+
/// Gets the ID of the stream.
18+
/// </summary>
19+
/// <remarks>
20+
/// This value is guaranteed to be unique on a per-session basis.
21+
/// </remarks>
22+
string StreamId { get; }
23+
24+
/// <summary>
25+
/// Gets the messages from the stream as an <see cref="IAsyncEnumerable{T}"/>.
26+
/// </summary>
27+
/// <param name="cancellationToken">A token to cancel the operation.</param>
28+
/// <returns>An <see cref="IAsyncEnumerable{T}"/> of <see cref="SseItem{T}"/> containing JSON-RPC messages.</returns>
29+
/// <remarks>
30+
/// If the stream's mode is set to <see cref="SseEventStreamMode.Polling"/>, the returned
31+
/// messages will only include the currently-available events starting at the last event ID specified
32+
/// when the reader was created. Otherwise, the returned messages will continue until the associated
33+
/// <see cref="ISseEventStreamWriter"/> is disposed.
34+
/// </remarks>
35+
IAsyncEnumerable<SseItem<JsonRpcMessage?>> ReadEventsAsync(CancellationToken cancellationToken = default);
36+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
namespace ModelContextProtocol.Server;
2+
3+
/// <summary>
4+
/// Provides storage and retrieval of SSE event streams, enabling resumability and redelivery of events.
5+
/// </summary>
6+
public interface ISseEventStreamStore
7+
{
8+
/// <summary>
9+
/// Creates a new SSE event stream with the specified options.
10+
/// </summary>
11+
/// <param name="options">The configuration options for the new stream.</param>
12+
/// <param name="cancellationToken">A token to cancel the operation.</param>
13+
/// <returns>A writer for the newly created event stream.</returns>
14+
ValueTask<ISseEventStreamWriter> CreateStreamAsync(SseEventStreamOptions options, CancellationToken cancellationToken = default);
15+
16+
/// <summary>
17+
/// Gets a reader for an existing event stream based on the last event ID.
18+
/// </summary>
19+
/// <param name="lastEventId">The ID of the last event received by the client, used to resume from that point.</param>
20+
/// <param name="cancellationToken">A token to cancel the operation.</param>
21+
/// <returns>A reader for the event stream, or <c>null</c> if no matching stream is found.</returns>
22+
ValueTask<ISseEventStreamReader?> GetStreamReaderAsync(string lastEventId, CancellationToken cancellationToken);
23+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
using ModelContextProtocol.Protocol;
2+
using System.Net.ServerSentEvents;
3+
4+
namespace ModelContextProtocol.Server;
5+
6+
/// <summary>
7+
/// Provides write access to an SSE event stream, allowing events to be written and tracked with unique IDs.
8+
/// </summary>
9+
public interface ISseEventStreamWriter : IAsyncDisposable
10+
{
11+
/// <summary>
12+
/// Gets the ID of the stream.
13+
/// </summary>
14+
/// <remarks>
15+
/// This value is guaranteed to be unique on a per-session basis.
16+
/// </remarks>
17+
string StreamId { get; }
18+
19+
/// <summary>
20+
/// Gets the current mode of the event stream.
21+
/// </summary>
22+
SseEventStreamMode Mode { get; }
23+
24+
/// <summary>
25+
/// Sets the mode of the event stream.
26+
/// </summary>
27+
/// <param name="mode">The new mode to set for the event stream.</param>
28+
/// <param name="cancellationToken">A token to cancel the operation.</param>
29+
/// <returns>A task that represents the asynchronous operation.</returns>
30+
ValueTask SetModeAsync(SseEventStreamMode mode, CancellationToken cancellationToken = default);
31+
32+
/// <summary>
33+
/// Writes an event to the stream.
34+
/// </summary>
35+
/// <param name="sseItem">The original <see cref="SseItem{T}"/>.</param>
36+
/// <param name="cancellationToken">A token to cancel the operation.</param>
37+
/// <returns>A new <see cref="SseItem{T}"/> with a populated event ID.</returns>
38+
/// <remarks>
39+
/// If the provided <paramref name="sseItem"/> already has an event ID, this method skips writing the event.
40+
/// Otherwise, an event ID unique to all sessions and streams is generated and assigned to the event.
41+
/// </remarks>
42+
ValueTask<SseItem<JsonRpcMessage?>> WriteEventAsync(SseItem<JsonRpcMessage?> sseItem, CancellationToken cancellationToken = default);
43+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
namespace ModelContextProtocol.Server;
2+
3+
/// <summary>
4+
/// Represents the mode of an SSE event stream.
5+
/// </summary>
6+
public enum SseEventStreamMode
7+
{
8+
/// <summary>
9+
/// Causes the event stream returned by <see cref="ISseEventStreamReader.ReadEventsAsync(System.Threading.CancellationToken)"/> to only end when
10+
/// the associated <see cref="ISseEventStreamWriter"/> gets disposed.
11+
/// </summary>
12+
Default = 0,
13+
14+
/// <summary>
15+
/// Causes the event stream returned by <see cref="ISseEventStreamReader.ReadEventsAsync(System.Threading.CancellationToken)"/> to end
16+
/// after the most recent event has been consumed. This forces clients to keep making new requests in order to receive
17+
/// the latest messages.
18+
/// </summary>
19+
Polling = 1,
20+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
namespace ModelContextProtocol.Server;
2+
3+
/// <summary>
4+
/// Configuration options for creating an SSE event stream.
5+
/// </summary>
6+
public sealed class SseEventStreamOptions
7+
{
8+
/// <summary>
9+
/// Gets or sets the session ID associated with the event stream.
10+
/// </summary>
11+
public required string SessionId { get; set; }
12+
13+
/// <summary>
14+
/// Gets or sets the stream ID that uniquely identifies this stream within a session.
15+
/// </summary>
16+
public required string StreamId { get; set; }
17+
18+
/// <summary>
19+
/// Gets or sets the mode of the event stream. Defaults to <see cref="SseEventStreamMode.Default"/>.
20+
/// </summary>
21+
public SseEventStreamMode Mode { get; set; }
22+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
using ModelContextProtocol.Protocol;
2+
using System.Buffers;
3+
using System.Net.ServerSentEvents;
4+
using System.Text.Json;
5+
6+
namespace ModelContextProtocol.Server;
7+
8+
/// <summary>
9+
/// Provides extension methods for <see cref="ISseEventStreamReader"/>.
10+
/// </summary>
11+
public static class SseEventStreamReaderExtensions
12+
{
13+
/// <summary>
14+
/// Copies all events from the reader to the destination stream in SSE format.
15+
/// </summary>
16+
/// <param name="reader">The event stream reader to copy events from.</param>
17+
/// <param name="destination">The destination stream to write SSE-formatted events to.</param>
18+
/// <param name="cancellationToken">A token to cancel the operation.</param>
19+
/// <returns>A task that represents the asynchronous copy operation.</returns>
20+
/// <exception cref="ArgumentNullException">Thrown when <paramref name="reader"/> or <paramref name="destination"/> is null.</exception>
21+
public static async Task CopyToAsync(this ISseEventStreamReader reader, Stream destination, CancellationToken cancellationToken = default)
22+
{
23+
Throw.IfNull(reader);
24+
Throw.IfNull(destination);
25+
26+
Utf8JsonWriter? jsonWriter = null;
27+
28+
var events = reader.ReadEventsAsync(cancellationToken);
29+
await SseFormatter.WriteAsync(events, destination, FormatEvent, cancellationToken);
30+
31+
void FormatEvent(SseItem<JsonRpcMessage?> item, IBufferWriter<byte> writer)
32+
{
33+
if (item.Data is null)
34+
{
35+
return;
36+
}
37+
38+
if (jsonWriter is null)
39+
{
40+
jsonWriter = new Utf8JsonWriter(writer);
41+
}
42+
else
43+
{
44+
jsonWriter.Reset(writer);
45+
}
46+
47+
JsonSerializer.Serialize(jsonWriter, item.Data, McpJsonUtilities.JsonContext.Default.JsonRpcMessage!);
48+
}
49+
}
50+
}

0 commit comments

Comments
 (0)