diff --git a/Directory.Packages.props b/Directory.Packages.props
index 2b1dbb7a..b0ae3c6a 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -11,6 +11,8 @@
+
+
diff --git a/src/ModelContextProtocol.Core/McpJsonUtilities.cs b/src/ModelContextProtocol.Core/McpJsonUtilities.cs
index b3d98dd0..081ce000 100644
--- a/src/ModelContextProtocol.Core/McpJsonUtilities.cs
+++ b/src/ModelContextProtocol.Core/McpJsonUtilities.cs
@@ -1,6 +1,7 @@
using Microsoft.Extensions.AI;
using ModelContextProtocol.Authentication;
using ModelContextProtocol.Protocol;
+using ModelContextProtocol.Server;
using System.Diagnostics.CodeAnalysis;
using System.Text.Json;
using System.Text.Json.Serialization;
@@ -158,6 +159,10 @@ internal static bool IsValidMcpToolSchema(JsonElement element)
[JsonSerializable(typeof(BlobResourceContents))]
[JsonSerializable(typeof(TextResourceContents))]
+ // Distributed cache event stream store
+ [JsonSerializable(typeof(DistributedCacheEventStreamStore.StreamMetadata))]
+ [JsonSerializable(typeof(DistributedCacheEventStreamStore.StoredEvent))]
+
// Other MCP Types
[JsonSerializable(typeof(IReadOnlyDictionary))]
[JsonSerializable(typeof(ProgressToken))]
diff --git a/src/ModelContextProtocol.Core/ModelContextProtocol.Core.csproj b/src/ModelContextProtocol.Core/ModelContextProtocol.Core.csproj
index 9e22a5c0..9e18c7b7 100644
--- a/src/ModelContextProtocol.Core/ModelContextProtocol.Core.csproj
+++ b/src/ModelContextProtocol.Core/ModelContextProtocol.Core.csproj
@@ -49,6 +49,7 @@
+
diff --git a/src/ModelContextProtocol.Core/Server/DistributedCacheEventIdFormatter.cs b/src/ModelContextProtocol.Core/Server/DistributedCacheEventIdFormatter.cs
new file mode 100644
index 00000000..e54ac0b9
--- /dev/null
+++ b/src/ModelContextProtocol.Core/Server/DistributedCacheEventIdFormatter.cs
@@ -0,0 +1,59 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+// This is a shared source file included in both ModelContextProtocol.Core and the test project.
+// Do not reference symbols internal to the core project, as they won't be available in tests.
+
+using System.Text;
+
+namespace ModelContextProtocol.Server;
+
+///
+/// Provides methods for formatting and parsing event IDs used by .
+///
+///
+/// Event IDs are formatted as "{base64(sessionId)}:{base64(streamId)}:{sequence}".
+///
+internal static class DistributedCacheEventIdFormatter
+{
+ private const char Separator = ':';
+
+ ///
+ /// Formats session ID, stream ID, and sequence number into an event ID string.
+ ///
+ public static string Format(string sessionId, string streamId, long sequence)
+ {
+ // Base64-encode session and stream IDs so the event ID can be parsed
+ // even if the original IDs contain the ':' separator character
+ var sessionBase64 = Convert.ToBase64String(Encoding.UTF8.GetBytes(sessionId));
+ var streamBase64 = Convert.ToBase64String(Encoding.UTF8.GetBytes(streamId));
+ return $"{sessionBase64}{Separator}{streamBase64}{Separator}{sequence}";
+ }
+
+ ///
+ /// Attempts to parse an event ID into its component parts.
+ ///
+ public static bool TryParse(string eventId, out string sessionId, out string streamId, out long sequence)
+ {
+ sessionId = string.Empty;
+ streamId = string.Empty;
+ sequence = 0;
+
+ var parts = eventId.Split(Separator);
+ if (parts.Length != 3)
+ {
+ return false;
+ }
+
+ try
+ {
+ sessionId = Encoding.UTF8.GetString(Convert.FromBase64String(parts[0]));
+ streamId = Encoding.UTF8.GetString(Convert.FromBase64String(parts[1]));
+ return long.TryParse(parts[2], out sequence);
+ }
+ catch
+ {
+ return false;
+ }
+ }
+}
diff --git a/src/ModelContextProtocol.Core/Server/DistributedCacheEventStreamStore.cs b/src/ModelContextProtocol.Core/Server/DistributedCacheEventStreamStore.cs
new file mode 100644
index 00000000..2ba3f834
--- /dev/null
+++ b/src/ModelContextProtocol.Core/Server/DistributedCacheEventStreamStore.cs
@@ -0,0 +1,300 @@
+using Microsoft.Extensions.Caching.Distributed;
+using ModelContextProtocol.Protocol;
+using System.Net.ServerSentEvents;
+using System.Runtime.CompilerServices;
+using System.Text.Json;
+
+namespace ModelContextProtocol.Server;
+
+///
+/// An implementation backed by .
+///
+///
+///
+/// This implementation stores SSE events in a distributed cache, enabling resumability across
+/// multiple server instances. Event IDs are encoded with session, stream, and sequence information
+/// to allow efficient retrieval of events after a given point.
+///
+///
+/// The writer maintains in-memory state for sequence number generation, as there is guaranteed
+/// to be only one writer per stream. Readers may be created from separate processes.
+///
+///
+public sealed class DistributedCacheEventStreamStore : ISseEventStreamStore
+{
+ private readonly IDistributedCache _cache;
+ private readonly DistributedCacheEventStreamStoreOptions _options;
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The distributed cache to use for storage.
+ /// Optional configuration options for the store.
+ public DistributedCacheEventStreamStore(IDistributedCache cache, DistributedCacheEventStreamStoreOptions? options = null)
+ {
+ Throw.IfNull(cache);
+ _cache = cache;
+ _options = options ?? new();
+ }
+
+ ///
+ public ValueTask CreateStreamAsync(SseEventStreamOptions options, CancellationToken cancellationToken = default)
+ {
+ Throw.IfNull(options);
+ var writer = new DistributedCacheEventStreamWriter(_cache, options.SessionId, options.StreamId, options.Mode, _options);
+ return new ValueTask(writer);
+ }
+
+ ///
+ public async ValueTask GetStreamReaderAsync(string lastEventId, CancellationToken cancellationToken = default)
+ {
+ Throw.IfNull(lastEventId);
+
+ // Parse the event ID to get session, stream, and sequence information
+ if (!DistributedCacheEventIdFormatter.TryParse(lastEventId, out var sessionId, out var streamId, out var sequence))
+ {
+ return null;
+ }
+
+ // Check if the stream exists by looking for its metadata
+ var metadataKey = CacheKeys.StreamMetadata(sessionId, streamId);
+ var metadataBytes = await _cache.GetAsync(metadataKey, cancellationToken).ConfigureAwait(false);
+ if (metadataBytes is null)
+ {
+ return null;
+ }
+
+ var metadata = JsonSerializer.Deserialize(metadataBytes, McpJsonUtilities.JsonContext.Default.StreamMetadata);
+ if (metadata is null)
+ {
+ return null;
+ }
+
+ var startSequence = sequence + 1;
+ return new DistributedCacheEventStreamReader(_cache, sessionId, streamId, startSequence, metadata, _options);
+ }
+
+ ///
+ /// Provides methods for generating cache keys.
+ ///
+ internal static class CacheKeys
+ {
+ private const string Prefix = "mcp:sse:";
+
+ public static string StreamMetadata(string sessionId, string streamId) =>
+ $"{Prefix}meta:{sessionId}:{streamId}";
+
+ public static string Event(string eventId) =>
+ $"{Prefix}event:{eventId}";
+
+ public static string StreamEventCount(string sessionId, string streamId) =>
+ $"{Prefix}count:{sessionId}:{streamId}";
+ }
+
+ ///
+ /// Metadata about a stream stored in the cache.
+ ///
+ internal sealed class StreamMetadata
+ {
+ public SseEventStreamMode Mode { get; set; }
+ public bool IsCompleted { get; set; }
+ public long LastSequence { get; set; }
+ }
+
+ ///
+ /// Serialized representation of an SSE event stored in the cache.
+ ///
+ internal sealed class StoredEvent
+ {
+ public string? EventType { get; set; }
+ public string? EventId { get; set; }
+ public JsonRpcMessage? Data { get; set; }
+ }
+
+ private sealed class DistributedCacheEventStreamWriter : ISseEventStreamWriter
+ {
+ private readonly IDistributedCache _cache;
+ private readonly string _sessionId;
+ private readonly string _streamId;
+ private SseEventStreamMode _mode;
+ private readonly DistributedCacheEventStreamStoreOptions _options;
+ private long _sequence;
+ private bool _disposed;
+
+ public DistributedCacheEventStreamWriter(
+ IDistributedCache cache,
+ string sessionId,
+ string streamId,
+ SseEventStreamMode mode,
+ DistributedCacheEventStreamStoreOptions options)
+ {
+ _cache = cache;
+ _sessionId = sessionId;
+ _streamId = streamId;
+ _mode = mode;
+ _options = options;
+ }
+
+ public async ValueTask SetModeAsync(SseEventStreamMode mode, CancellationToken cancellationToken = default)
+ {
+ _mode = mode;
+ await UpdateMetadataAsync(cancellationToken).ConfigureAwait(false);
+ }
+
+ public async ValueTask> WriteEventAsync(SseItem sseItem, CancellationToken cancellationToken = default)
+ {
+ // Skip if already has an event ID
+ if (sseItem.EventId is not null)
+ {
+ return sseItem;
+ }
+
+ // Generate a new sequence number and event ID
+ var sequence = Interlocked.Increment(ref _sequence);
+ var eventId = DistributedCacheEventIdFormatter.Format(_sessionId, _streamId, sequence);
+ var newItem = sseItem with { EventId = eventId };
+
+ // Store the event in the cache
+ var storedEvent = new StoredEvent
+ {
+ EventType = newItem.EventType,
+ EventId = eventId,
+ Data = newItem.Data,
+ };
+
+ var eventBytes = JsonSerializer.SerializeToUtf8Bytes(storedEvent, McpJsonUtilities.JsonContext.Default.StoredEvent);
+ var eventKey = CacheKeys.Event(eventId);
+
+ await _cache.SetAsync(eventKey, eventBytes, new DistributedCacheEntryOptions
+ {
+ SlidingExpiration = _options.EventSlidingExpiration,
+ AbsoluteExpirationRelativeToNow = _options.EventAbsoluteExpiration,
+ }, cancellationToken).ConfigureAwait(false);
+
+ // Update metadata with the latest sequence
+ await UpdateMetadataAsync(cancellationToken).ConfigureAwait(false);
+
+ return newItem;
+ }
+
+ private async ValueTask UpdateMetadataAsync(CancellationToken cancellationToken)
+ {
+ var metadata = new StreamMetadata
+ {
+ Mode = _mode,
+ IsCompleted = _disposed,
+ LastSequence = Interlocked.Read(ref _sequence),
+ };
+
+ var metadataBytes = JsonSerializer.SerializeToUtf8Bytes(metadata, McpJsonUtilities.JsonContext.Default.StreamMetadata);
+ var metadataKey = CacheKeys.StreamMetadata(_sessionId, _streamId);
+
+ await _cache.SetAsync(metadataKey, metadataBytes, new DistributedCacheEntryOptions
+ {
+ SlidingExpiration = _options.MetadataSlidingExpiration,
+ AbsoluteExpirationRelativeToNow = _options.MetadataAbsoluteExpiration,
+ }, cancellationToken).ConfigureAwait(false);
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ if (_disposed)
+ {
+ return;
+ }
+
+ _disposed = true;
+
+ // Mark the stream as completed in the metadata
+ await UpdateMetadataAsync(CancellationToken.None).ConfigureAwait(false);
+ }
+ }
+
+ private sealed class DistributedCacheEventStreamReader : ISseEventStreamReader
+ {
+ private readonly IDistributedCache _cache;
+ private readonly long _startSequence;
+ private readonly StreamMetadata _initialMetadata;
+ private readonly DistributedCacheEventStreamStoreOptions _options;
+
+ public DistributedCacheEventStreamReader(
+ IDistributedCache cache,
+ string sessionId,
+ string streamId,
+ long startSequence,
+ StreamMetadata initialMetadata,
+ DistributedCacheEventStreamStoreOptions options)
+ {
+ _cache = cache;
+ SessionId = sessionId;
+ StreamId = streamId;
+ _startSequence = startSequence;
+ _initialMetadata = initialMetadata;
+ _options = options;
+ }
+
+ public string SessionId { get; }
+ public string StreamId { get; }
+
+ public async IAsyncEnumerable> ReadEventsAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
+ {
+ // Start from the sequence after the last received event
+ var currentSequence = _startSequence;
+
+ // Use the initial metadata passed to the constructor for the first read.
+ var lastSequence = _initialMetadata.LastSequence;
+ var isCompleted = _initialMetadata.IsCompleted;
+ var mode = _initialMetadata.Mode;
+
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ // Read all available events from currentSequence + 1 to lastSequence
+ for (; currentSequence <= lastSequence; currentSequence++)
+ {
+ cancellationToken.ThrowIfCancellationRequested();
+
+ var eventId = DistributedCacheEventIdFormatter.Format(SessionId, StreamId, currentSequence);
+ var eventKey = CacheKeys.Event(eventId);
+ var eventBytes = await _cache.GetAsync(eventKey, cancellationToken).ConfigureAwait(false)
+ ?? throw new McpException($"SSE event with ID '{eventId}' was not found in the cache. The event may have expired.");
+
+ var storedEvent = JsonSerializer.Deserialize(eventBytes, McpJsonUtilities.JsonContext.Default.StoredEvent);
+ if (storedEvent is not null)
+ {
+ yield return new SseItem(storedEvent.Data, storedEvent.EventType)
+ {
+ EventId = storedEvent.EventId,
+ };
+ }
+ }
+
+ // If in polling mode, stop after returning currently available events
+ if (mode == SseEventStreamMode.Polling)
+ {
+ yield break;
+ }
+
+ // If the stream is completed and we've read all events, stop
+ if (isCompleted)
+ {
+ yield break;
+ }
+
+ // Wait before polling again for new events
+ await Task.Delay(_options.PollingInterval, cancellationToken).ConfigureAwait(false);
+
+ // Refresh metadata to get the latest sequence and completion status
+ var metadataKey = CacheKeys.StreamMetadata(SessionId, StreamId);
+ var metadataBytes = await _cache.GetAsync(metadataKey, cancellationToken).ConfigureAwait(false)
+ ?? throw new McpException($"Stream metadata for session '{SessionId}' and stream '{StreamId}' was not found in the cache. The metadata may have expired.");
+
+ var currentMetadata = JsonSerializer.Deserialize(metadataBytes, McpJsonUtilities.JsonContext.Default.StreamMetadata)
+ ?? throw new McpException($"Stream metadata for session '{SessionId}' and stream '{StreamId}' could not be deserialized.");
+
+ lastSequence = currentMetadata.LastSequence;
+ isCompleted = currentMetadata.IsCompleted;
+ mode = currentMetadata.Mode;
+ }
+ }
+ }
+}
diff --git a/src/ModelContextProtocol.Core/Server/DistributedCacheEventStreamStoreOptions.cs b/src/ModelContextProtocol.Core/Server/DistributedCacheEventStreamStoreOptions.cs
new file mode 100644
index 00000000..1c845213
--- /dev/null
+++ b/src/ModelContextProtocol.Core/Server/DistributedCacheEventStreamStoreOptions.cs
@@ -0,0 +1,51 @@
+namespace ModelContextProtocol.Server;
+
+///
+/// Configuration options for .
+///
+public sealed class DistributedCacheEventStreamStoreOptions
+{
+ ///
+ /// Gets or sets the sliding expiration for individual events in the cache.
+ ///
+ ///
+ /// Events are refreshed on each access. If an event is not accessed within this
+ /// time period, it may be evicted from the cache.
+ ///
+ public TimeSpan? EventSlidingExpiration { get; set; } = TimeSpan.FromMinutes(30);
+
+ ///
+ /// Gets or sets the absolute expiration for individual events in the cache.
+ ///
+ ///
+ /// Events will be evicted from the cache after this time period, regardless of access.
+ ///
+ public TimeSpan? EventAbsoluteExpiration { get; set; } = TimeSpan.FromHours(2);
+
+ ///
+ /// Gets or sets the sliding expiration for stream metadata in the cache.
+ ///
+ ///
+ /// Stream metadata includes mode and completion status. This should typically be
+ /// set to a longer duration than event expiration to allow for resumability.
+ ///
+ public TimeSpan? MetadataSlidingExpiration { get; set; } = TimeSpan.FromHours(1);
+
+ ///
+ /// Gets or sets the absolute expiration for stream metadata in the cache.
+ ///
+ ///
+ /// Stream metadata will be evicted from the cache after this time period, regardless of access.
+ ///
+ public TimeSpan? MetadataAbsoluteExpiration { get; set; } = TimeSpan.FromHours(4);
+
+ ///
+ /// Gets or sets the interval between polling attempts when a reader is waiting for new events
+ /// in mode.
+ ///
+ ///
+ /// This only affects readers. A shorter interval provides lower latency for new events
+ /// but increases cache access frequency.
+ ///
+ public TimeSpan PollingInterval { get; set; } = TimeSpan.FromMilliseconds(100);
+}
diff --git a/tests/ModelContextProtocol.Tests/ModelContextProtocol.Tests.csproj b/tests/ModelContextProtocol.Tests/ModelContextProtocol.Tests.csproj
index e0fb3d1f..84b0ee99 100644
--- a/tests/ModelContextProtocol.Tests/ModelContextProtocol.Tests.csproj
+++ b/tests/ModelContextProtocol.Tests/ModelContextProtocol.Tests.csproj
@@ -29,6 +29,10 @@
+
+
+
+
@@ -41,6 +45,7 @@
+
diff --git a/tests/ModelContextProtocol.Tests/Server/DistributedCacheEventStreamStoreTests.cs b/tests/ModelContextProtocol.Tests/Server/DistributedCacheEventStreamStoreTests.cs
new file mode 100644
index 00000000..eb5d5c30
--- /dev/null
+++ b/tests/ModelContextProtocol.Tests/Server/DistributedCacheEventStreamStoreTests.cs
@@ -0,0 +1,1646 @@
+using Microsoft.Extensions.Caching.Distributed;
+using Microsoft.Extensions.Caching.Memory;
+using Microsoft.Extensions.Options;
+using ModelContextProtocol.Protocol;
+using ModelContextProtocol.Server;
+using ModelContextProtocol.Tests.Utils;
+using System.Net.ServerSentEvents;
+
+namespace ModelContextProtocol.AspNetCore.Tests;
+
+///
+/// Tests for .
+///
+public class DistributedCacheEventStreamStoreTests(ITestOutputHelper testOutputHelper) : LoggedTest(testOutputHelper)
+{
+ private static CancellationToken CancellationToken => TestContext.Current.CancellationToken;
+
+ private static IDistributedCache CreateMemoryCache()
+ {
+ var options = Options.Create(new MemoryDistributedCacheOptions());
+ return new MemoryDistributedCache(options);
+ }
+
+ [Fact]
+ public void Constructor_ThrowsArgumentNullException_WhenCacheIsNull()
+ {
+ Assert.Throws("cache", () => new DistributedCacheEventStreamStore(null!));
+ }
+
+ [Fact]
+ public async Task CreateStreamAsync_ThrowsArgumentNullException_WhenOptionsIsNull()
+ {
+ // Arrange
+ var cache = CreateMemoryCache();
+ var store = new DistributedCacheEventStreamStore(cache);
+
+ // Act & Assert
+ await Assert.ThrowsAsync("options",
+ async () => await store.CreateStreamAsync(null!, CancellationToken));
+ }
+
+ [Fact]
+ public async Task WriteEventAsync_AssignsUniqueEventId_WhenItemHasNoEventId()
+ {
+ // Arrange
+ var cache = CreateMemoryCache();
+ var store = new DistributedCacheEventStreamStore(cache);
+ var writer = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Streaming
+ }, CancellationToken);
+
+ var item = new SseItem(null);
+
+ // Act
+ var result = await writer.WriteEventAsync(item, CancellationToken);
+
+ // Assert
+ Assert.NotNull(result.EventId);
+ Assert.NotEmpty(result.EventId);
+ }
+
+ [Fact]
+ public async Task WriteEventAsync_SkipsAssigningEventId_WhenItemAlreadyHasEventId()
+ {
+ // Arrange
+ var cache = CreateMemoryCache();
+ var store = new DistributedCacheEventStreamStore(cache);
+ var writer = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Streaming
+ }, CancellationToken);
+
+ var existingEventId = "existing-event-id";
+ var item = new SseItem(null) { EventId = existingEventId };
+
+ // Act
+ var result = await writer.WriteEventAsync(item, CancellationToken);
+
+ // Assert
+ Assert.Equal(existingEventId, result.EventId);
+ }
+
+ [Fact]
+ public async Task WriteEventAsync_PreservesDataProperty_InReturnedItem()
+ {
+ // Arrange
+ var cache = CreateMemoryCache();
+ var store = new DistributedCacheEventStreamStore(cache);
+ var writer = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Streaming
+ }, CancellationToken);
+
+ var message = new JsonRpcNotification { Method = "test/notification" };
+ var item = new SseItem(message);
+
+ // Act
+ var result = await writer.WriteEventAsync(item, CancellationToken);
+
+ // Assert - Data should be preserved in the returned item (same reference)
+ Assert.Same(message, result.Data);
+ }
+
+ [Fact]
+ public async Task WriteEventAsync_PreservesEventTypeProperty_InReturnedItem()
+ {
+ // Arrange
+ var cache = CreateMemoryCache();
+ var store = new DistributedCacheEventStreamStore(cache);
+ var writer = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Streaming
+ }, CancellationToken);
+
+ var item = new SseItem(null, "custom-event-type");
+
+ // Act
+ var result = await writer.WriteEventAsync(item, CancellationToken);
+
+ // Assert
+ Assert.Equal("custom-event-type", result.EventType);
+ }
+
+ [Fact]
+ public async Task WriteEventAsync_HandlesNullData_AssignsEventIdAndStoresEvent()
+ {
+ // Arrange
+ var cache = CreateMemoryCache();
+ var store = new DistributedCacheEventStreamStore(cache);
+ var writer = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Polling
+ }, CancellationToken);
+
+ var item = new SseItem(null);
+
+ // Act
+ var result = await writer.WriteEventAsync(item, CancellationToken);
+
+ // Assert - Event ID should be assigned
+ Assert.NotNull(result.EventId);
+
+ // Assert - Event should be retrievable
+ var reader = await store.GetStreamReaderAsync(result.EventId, CancellationToken);
+ Assert.NotNull(reader);
+ }
+
+ [Fact]
+ public async Task WriteEventAsync_StoresEventWithCorrectSlidingExpiration()
+ {
+ // Arrange - Use a mock cache to verify expiration options
+ var mockCache = new TestDistributedCache();
+ var customOptions = new DistributedCacheEventStreamStoreOptions
+ {
+ EventSlidingExpiration = TimeSpan.FromMinutes(15)
+ };
+ var store = new DistributedCacheEventStreamStore(mockCache, customOptions);
+ var writer = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Streaming
+ }, CancellationToken);
+
+ var item = new SseItem(null);
+
+ // Act
+ await writer.WriteEventAsync(item, CancellationToken);
+
+ // Assert - Verify at least one call used the expected sliding expiration
+ Assert.Contains(mockCache.SetCalls, call =>
+ call.Key.Contains("event:") &&
+ call.Options.SlidingExpiration == TimeSpan.FromMinutes(15));
+ }
+
+ [Fact]
+ public async Task WriteEventAsync_StoresEventWithCorrectAbsoluteExpiration()
+ {
+ // Arrange
+ var mockCache = new TestDistributedCache();
+ var customOptions = new DistributedCacheEventStreamStoreOptions
+ {
+ EventAbsoluteExpiration = TimeSpan.FromHours(3)
+ };
+ var store = new DistributedCacheEventStreamStore(mockCache, customOptions);
+ var writer = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Streaming
+ }, CancellationToken);
+
+ var item = new SseItem(null);
+
+ // Act
+ await writer.WriteEventAsync(item, CancellationToken);
+
+ // Assert
+ Assert.Contains(mockCache.SetCalls, call =>
+ call.Key.Contains("event:") &&
+ call.Options.AbsoluteExpirationRelativeToNow == TimeSpan.FromHours(3));
+ }
+
+ [Fact]
+ public async Task WriteEventAsync_UpdatesStreamMetadata_AfterEachWrite()
+ {
+ // Arrange
+ var mockCache = new TestDistributedCache();
+ var store = new DistributedCacheEventStreamStore(mockCache);
+ var writer = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Streaming
+ }, CancellationToken);
+
+ var item = new SseItem(null);
+
+ // Act
+ await writer.WriteEventAsync(item, CancellationToken);
+
+ // Assert - Metadata should have been updated
+ Assert.Contains(mockCache.SetCalls, call => call.Key.Contains("meta:"));
+ }
+
+ [Fact]
+ public async Task SetModeAsync_PersistsModeChangeToMetadata()
+ {
+ // Arrange
+ var mockCache = new TestDistributedCache();
+ var store = new DistributedCacheEventStreamStore(mockCache);
+ var writer = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Streaming
+ }, CancellationToken);
+
+ mockCache.SetCalls.Clear(); // Clear calls from CreateStreamAsync setup
+
+ // Act
+ await writer.SetModeAsync(SseEventStreamMode.Polling, CancellationToken);
+
+ // Assert - Metadata should have been updated with the new mode
+ Assert.Contains(mockCache.SetCalls, call => call.Key.Contains("meta:"));
+ }
+
+ [Fact]
+ public async Task SetModeAsync_ModeChangeReflectedInReader()
+ {
+ // Arrange
+ var cache = CreateMemoryCache();
+ var customOptions = new DistributedCacheEventStreamStoreOptions
+ {
+ PollingInterval = TimeSpan.FromMilliseconds(10)
+ };
+ var store = new DistributedCacheEventStreamStore(cache, customOptions);
+ var writer = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Streaming
+ }, CancellationToken);
+
+ // Write an event to have something to read
+ var item = new SseItem(new JsonRpcNotification { Method = "test" });
+ var writtenItem = await writer.WriteEventAsync(item, CancellationToken);
+
+ // Get a reader based on the event ID (starting at sequence 1, reader will wait for seq 2+)
+ var reader = await store.GetStreamReaderAsync(writtenItem.EventId!, CancellationToken);
+ Assert.NotNull(reader);
+
+ // Act - Change mode to Polling while reader exists
+ await writer.SetModeAsync(SseEventStreamMode.Polling, CancellationToken);
+
+ // Assert - Reader should complete immediately in polling mode (no new events to read)
+ using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(500));
+ var events = new List>();
+ await foreach (var evt in reader.ReadEventsAsync(cts.Token))
+ {
+ events.Add(evt);
+ }
+
+ // In polling mode, reader should complete without waiting for new events
+ Assert.Empty(events); // No events after the one we used to create the reader
+ }
+
+ [Fact]
+ public async Task DisposeAsync_MarksStreamAsCompleted()
+ {
+ // Arrange
+ var cache = CreateMemoryCache();
+ var store = new DistributedCacheEventStreamStore(cache);
+ var writer = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Streaming
+ }, CancellationToken);
+
+ // Write an event so we can get a reader
+ var item = new SseItem(null);
+ var writtenItem = await writer.WriteEventAsync(item, CancellationToken);
+
+ // Act
+ await writer.DisposeAsync();
+
+ // Assert - Reader should see the stream as completed and exit immediately
+ var reader = await store.GetStreamReaderAsync(writtenItem.EventId!, CancellationToken);
+ Assert.NotNull(reader);
+
+ using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(500));
+ var events = new List>();
+ await foreach (var evt in reader.ReadEventsAsync(cts.Token))
+ {
+ events.Add(evt);
+ }
+
+ // The reader should complete without waiting for new events because stream is completed
+ Assert.Empty(events); // No new events after the one we used to create the reader
+ }
+
+ [Fact]
+ public async Task DisposeAsync_IsIdempotent()
+ {
+ // Arrange
+ var cache = CreateMemoryCache();
+ var store = new DistributedCacheEventStreamStore(cache);
+ var writer = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Streaming
+ }, CancellationToken);
+
+ // Act - Call DisposeAsync multiple times
+ await writer.DisposeAsync();
+ await writer.DisposeAsync();
+ await writer.DisposeAsync();
+
+ // Assert - No exception thrown, operation is idempotent
+ // If we got here without exception, the test passes
+ }
+
+ [Fact]
+ public async Task DisposeAsync_UpdatesMetadata_WithIsCompletedFlag()
+ {
+ // Arrange
+ var mockCache = new TestDistributedCache();
+ var store = new DistributedCacheEventStreamStore(mockCache);
+ var writer = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Streaming
+ }, CancellationToken);
+
+ mockCache.SetCalls.Clear(); // Clear calls from CreateStreamAsync
+
+ // Act
+ await writer.DisposeAsync();
+
+ // Assert - Metadata should have been updated
+ Assert.Contains(mockCache.SetCalls, call => call.Key.Contains("meta:"));
+ }
+
+ [Fact]
+ public async Task GetStreamReaderAsync_ThrowsArgumentNullException_WhenLastEventIdIsNull()
+ {
+ // Arrange
+ var cache = CreateMemoryCache();
+ var store = new DistributedCacheEventStreamStore(cache);
+
+ // Act & Assert
+ await Assert.ThrowsAsync("lastEventId",
+ async () => await store.GetStreamReaderAsync(null!, CancellationToken));
+ }
+
+ [Fact]
+ public async Task GetStreamReaderAsync_ReturnsNull_WhenEventIdIsUnparseable()
+ {
+ // Arrange
+ var cache = CreateMemoryCache();
+ var store = new DistributedCacheEventStreamStore(cache);
+
+ // Act - Try various invalid event ID formats
+ var result1 = await store.GetStreamReaderAsync("invalid-format", CancellationToken);
+ var result2 = await store.GetStreamReaderAsync("only:two:parts:here", CancellationToken);
+ var result3 = await store.GetStreamReaderAsync("", CancellationToken);
+
+ // Assert
+ Assert.Null(result1);
+ Assert.Null(result2);
+ Assert.Null(result3);
+ }
+
+ [Fact]
+ public async Task GetStreamReaderAsync_ReturnsNull_WhenStreamMetadataDoesNotExist()
+ {
+ // Arrange
+ var cache = CreateMemoryCache();
+ var store = new DistributedCacheEventStreamStore(cache);
+
+ // Create a valid-looking event ID for a stream that doesn't exist
+ var fakeEventId = DistributedCacheEventIdFormatter.Format("nonexistent-session", "nonexistent-stream", 1);
+
+ // Act
+ var reader = await store.GetStreamReaderAsync(fakeEventId, CancellationToken);
+
+ // Assert
+ Assert.Null(reader);
+ }
+
+ [Fact]
+ public async Task GetStreamReaderAsync_ReturnsReaderWithCorrectSessionIdAndStreamId()
+ {
+ // Arrange
+ var cache = CreateMemoryCache();
+ var store = new DistributedCacheEventStreamStore(cache);
+ var writer = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "my-session",
+ StreamId = "my-stream",
+ Mode = SseEventStreamMode.Streaming
+ }, CancellationToken);
+
+ // Write an event to get a valid event ID
+ var item = new SseItem(null);
+ var writtenItem = await writer.WriteEventAsync(item, CancellationToken);
+
+ // Act
+ var reader = await store.GetStreamReaderAsync(writtenItem.EventId!, CancellationToken);
+
+ // Assert
+ Assert.NotNull(reader);
+ Assert.Equal("my-session", reader.SessionId);
+ Assert.Equal("my-stream", reader.StreamId);
+ }
+
+ [Fact]
+ public async Task ReadEventsAsync_ReturnsEventsInOrder()
+ {
+ // Arrange
+ var cache = CreateMemoryCache();
+ var store = new DistributedCacheEventStreamStore(cache);
+ var writer = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Polling
+ }, CancellationToken);
+
+ // Write multiple events
+ var event1 = await writer.WriteEventAsync(new SseItem(new JsonRpcNotification { Method = "method1" }), CancellationToken);
+ var event2 = await writer.WriteEventAsync(new SseItem(new JsonRpcNotification { Method = "method2" }), CancellationToken);
+ var event3 = await writer.WriteEventAsync(new SseItem(new JsonRpcNotification { Method = "method3" }), CancellationToken);
+
+ // Create a reader starting from before the first event (use a fake event ID with sequence 0)
+ var startEventId = DistributedCacheEventIdFormatter.Format("session-1", "stream-1", 0);
+ var reader = await store.GetStreamReaderAsync(startEventId, CancellationToken);
+ Assert.NotNull(reader);
+
+ // Act
+ var events = new List>();
+ await foreach (var evt in reader.ReadEventsAsync(CancellationToken))
+ {
+ events.Add(evt);
+ }
+
+ // Assert - Events should be in order
+ Assert.Equal(3, events.Count);
+ Assert.Equal(event1.EventId, events[0].EventId);
+ Assert.Equal(event2.EventId, events[1].EventId);
+ Assert.Equal(event3.EventId, events[2].EventId);
+ }
+
+ [Fact]
+ public async Task ReadEventsAsync_ReturnsEmpty_WhenNoNewEventsExist()
+ {
+ // Arrange
+ var cache = CreateMemoryCache();
+ var store = new DistributedCacheEventStreamStore(cache);
+ var writer = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Polling
+ }, CancellationToken);
+
+ // Write one event
+ var writtenItem = await writer.WriteEventAsync(new SseItem(null), CancellationToken);
+
+ // Create a reader starting from the last event (so there are no new events to read)
+ var reader = await store.GetStreamReaderAsync(writtenItem.EventId!, CancellationToken);
+ Assert.NotNull(reader);
+
+ // Act
+ var events = new List>();
+ await foreach (var evt in reader.ReadEventsAsync(CancellationToken))
+ {
+ events.Add(evt);
+ }
+
+ // Assert
+ Assert.Empty(events);
+ }
+
+ [Fact]
+ public async Task ReadEventsAsync_PreservesCorrectDataEventTypeAndEventId()
+ {
+ // Arrange
+ var cache = CreateMemoryCache();
+ var store = new DistributedCacheEventStreamStore(cache);
+ var writer = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Polling
+ }, CancellationToken);
+
+ var message = new JsonRpcNotification { Method = "test/method" };
+ var writtenItem = await writer.WriteEventAsync(new SseItem(message, "custom-event-type"), CancellationToken);
+
+ // Create a reader starting from before the event
+ var startEventId = DistributedCacheEventIdFormatter.Format("session-1", "stream-1", 0);
+ var reader = await store.GetStreamReaderAsync(startEventId, CancellationToken);
+ Assert.NotNull(reader);
+
+ // Act
+ var events = new List>();
+ await foreach (var evt in reader.ReadEventsAsync(CancellationToken))
+ {
+ events.Add(evt);
+ }
+
+ // Assert
+ Assert.Single(events);
+ var readEvent = events[0];
+ Assert.Equal(writtenItem.EventId, readEvent.EventId);
+ Assert.Equal("custom-event-type", readEvent.EventType);
+
+ var readMessage = Assert.IsType(readEvent.Data);
+ Assert.Equal("test/method", readMessage.Method);
+ }
+
+ [Fact]
+ public async Task ReadEventsAsync_HandlesNullData()
+ {
+ // Arrange
+ var cache = CreateMemoryCache();
+ var store = new DistributedCacheEventStreamStore(cache);
+ var writer = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Polling
+ }, CancellationToken);
+
+ var writtenItem = await writer.WriteEventAsync(new SseItem(null), CancellationToken);
+
+ // Create a reader starting from before the event
+ var startEventId = DistributedCacheEventIdFormatter.Format("session-1", "stream-1", 0);
+ var reader = await store.GetStreamReaderAsync(startEventId, CancellationToken);
+ Assert.NotNull(reader);
+
+ // Act
+ var events = new List>();
+ await foreach (var evt in reader.ReadEventsAsync(CancellationToken))
+ {
+ events.Add(evt);
+ }
+
+ // Assert
+ Assert.Single(events);
+ Assert.Null(events[0].Data);
+ Assert.Equal(writtenItem.EventId, events[0].EventId);
+ }
+
+ [Fact]
+ public async Task ReadEventsAsync_InPollingMode_CompletesImmediatelyAfterReturningAvailableEvents()
+ {
+ // Arrange
+ var cache = CreateMemoryCache();
+ var store = new DistributedCacheEventStreamStore(cache);
+ var writer = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Polling
+ }, CancellationToken);
+
+ // Write events
+ await writer.WriteEventAsync(new SseItem(null), CancellationToken);
+ await writer.WriteEventAsync(new SseItem(null), CancellationToken);
+
+ // Create a reader from sequence 0
+ var startEventId = DistributedCacheEventIdFormatter.Format("session-1", "stream-1", 0);
+ var reader = await store.GetStreamReaderAsync(startEventId, CancellationToken);
+ Assert.NotNull(reader);
+
+ // Act - Should complete quickly without waiting for new events
+ var stopwatch = System.Diagnostics.Stopwatch.StartNew();
+ var events = new List>();
+ await foreach (var evt in reader.ReadEventsAsync(CancellationToken))
+ {
+ events.Add(evt);
+ }
+ stopwatch.Stop();
+
+ // Assert - Should have returned both events and completed quickly
+ Assert.Equal(2, events.Count);
+ Assert.True(stopwatch.ElapsedMilliseconds < 500, $"Polling mode should complete quickly, took {stopwatch.ElapsedMilliseconds}ms");
+ }
+
+ [Fact]
+ public async Task ReadEventsAsync_InPollingMode_ReturnsOnlyEventsAfterLastEventId()
+ {
+ // Arrange
+ var cache = CreateMemoryCache();
+ var store = new DistributedCacheEventStreamStore(cache);
+ var writer = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Polling
+ }, CancellationToken);
+
+ // Write 3 events
+ var event1 = await writer.WriteEventAsync(new SseItem(null), CancellationToken);
+ var event2 = await writer.WriteEventAsync(new SseItem(null), CancellationToken);
+ var event3 = await writer.WriteEventAsync(new SseItem(null), CancellationToken);
+
+ // Create a reader starting from event2 (should only return event3)
+ var reader = await store.GetStreamReaderAsync(event2.EventId!, CancellationToken);
+ Assert.NotNull(reader);
+
+ // Act
+ var events = new List>();
+ await foreach (var evt in reader.ReadEventsAsync(CancellationToken))
+ {
+ events.Add(evt);
+ }
+
+ // Assert - Only event3 should be returned
+ Assert.Single(events);
+ Assert.Equal(event3.EventId, events[0].EventId);
+ }
+
+ [Fact]
+ public async Task ReadEventsAsync_InPollingMode_ReturnsEmptyIfNoNewEvents()
+ {
+ // Arrange
+ var cache = CreateMemoryCache();
+ var store = new DistributedCacheEventStreamStore(cache);
+ var writer = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Polling
+ }, CancellationToken);
+
+ // Write one event and create a reader from that event (no events after it)
+ var writtenEvent = await writer.WriteEventAsync(new SseItem(null), CancellationToken);
+ var reader = await store.GetStreamReaderAsync(writtenEvent.EventId!, CancellationToken);
+ Assert.NotNull(reader);
+
+ // Act
+ var events = new List>();
+ await foreach (var evt in reader.ReadEventsAsync(CancellationToken))
+ {
+ events.Add(evt);
+ }
+
+ // Assert - No new events should be returned
+ Assert.Empty(events);
+ }
+
+ [Fact]
+ public async Task ReadEventsAsync_InPollingMode_DoesNotWaitForNewEvents()
+ {
+ // Arrange
+ var cache = CreateMemoryCache();
+ var store = new DistributedCacheEventStreamStore(cache);
+ var writer = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Polling
+ }, CancellationToken);
+
+ // Write one event so we have a valid event ID, then create reader from it
+ var writtenEvent = await writer.WriteEventAsync(new SseItem(null), CancellationToken);
+ var reader = await store.GetStreamReaderAsync(writtenEvent.EventId!, CancellationToken);
+ Assert.NotNull(reader);
+
+ // Act - Should complete immediately without waiting (no new events after the one we started from)
+ var stopwatch = System.Diagnostics.Stopwatch.StartNew();
+ var events = new List>();
+ await foreach (var evt in reader.ReadEventsAsync(CancellationToken))
+ {
+ events.Add(evt);
+ }
+ stopwatch.Stop();
+
+ // Assert - Should complete quickly with no events
+ Assert.Empty(events);
+ Assert.True(stopwatch.ElapsedMilliseconds < 500, $"Polling mode should complete quickly, took {stopwatch.ElapsedMilliseconds}ms");
+ }
+
+ [Fact]
+ public async Task ReadEventsAsync_InStreamingMode_WaitsForNewEvents()
+ {
+ // Arrange
+ var cache = CreateMemoryCache();
+ var store = new DistributedCacheEventStreamStore(cache, new DistributedCacheEventStreamStoreOptions
+ {
+ PollingInterval = TimeSpan.FromMilliseconds(50)
+ });
+ var writer = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Streaming
+ }, CancellationToken);
+
+ // Write one event so we have a valid event ID
+ var writtenEvent = await writer.WriteEventAsync(new SseItem(null), CancellationToken);
+ var reader = await store.GetStreamReaderAsync(writtenEvent.EventId!, CancellationToken);
+ Assert.NotNull(reader);
+
+ // Act - Start reading and then write a new event
+ using var cts = CancellationTokenSource.CreateLinkedTokenSource(CancellationToken);
+ cts.CancelAfter(TimeSpan.FromSeconds(2));
+ var events = new List>();
+ var readTask = Task.Run(async () =>
+ {
+ await foreach (var evt in reader.ReadEventsAsync(cts.Token))
+ {
+ events.Add(evt);
+ if (events.Count >= 1)
+ {
+ // Got the event we were waiting for, cancel to stop
+ await cts.CancelAsync();
+ }
+ }
+ }, CancellationToken);
+
+ // Write a new event - the reader should pick it up since it's in streaming mode
+ // and won't complete until cancelled or the stream is disposed
+ var newEvent = await writer.WriteEventAsync(new SseItem(null), CancellationToken);
+
+ // Wait for read to complete (either event received or timeout)
+ try
+ {
+ await readTask;
+ }
+ catch (OperationCanceledException)
+ {
+ // Expected when we cancel after receiving event
+ }
+
+ // Assert - Should have received the new event
+ Assert.Single(events);
+ Assert.Equal(newEvent.EventId, events[0].EventId);
+ }
+
+ [Fact]
+ public async Task ReadEventsAsync_InStreamingMode_YieldsNewlyWrittenEvents()
+ {
+ // Arrange
+ var cache = CreateMemoryCache();
+ var store = new DistributedCacheEventStreamStore(cache, new DistributedCacheEventStreamStoreOptions
+ {
+ PollingInterval = TimeSpan.FromMilliseconds(50)
+ });
+ var writer = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Streaming
+ }, CancellationToken);
+
+ // Write initial event
+ var initialEvent = await writer.WriteEventAsync(new SseItem(null), CancellationToken);
+ var reader = await store.GetStreamReaderAsync(initialEvent.EventId!, CancellationToken);
+ Assert.NotNull(reader);
+
+ // Act - Write multiple events while reader is active
+ using var cts = CancellationTokenSource.CreateLinkedTokenSource(CancellationToken);
+ cts.CancelAfter(TimeSpan.FromSeconds(3));
+ var events = new List>();
+ var readTask = Task.Run(async () =>
+ {
+ await foreach (var evt in reader.ReadEventsAsync(cts.Token))
+ {
+ events.Add(evt);
+ if (events.Count >= 3)
+ {
+ await cts.CancelAsync();
+ }
+ }
+ }, CancellationToken);
+
+ // Write 3 new events - the reader should pick them up since it's in streaming mode
+ var event1 = await writer.WriteEventAsync(new SseItem(null), CancellationToken);
+ var event2 = await writer.WriteEventAsync(new SseItem(null), CancellationToken);
+ var event3 = await writer.WriteEventAsync(new SseItem(null), CancellationToken);
+
+ try
+ {
+ await readTask;
+ }
+ catch (OperationCanceledException)
+ {
+ // Expected
+ }
+
+ // Assert - Should have received all 3 events in order
+ Assert.Equal(3, events.Count);
+ Assert.Equal(event1.EventId, events[0].EventId);
+ Assert.Equal(event2.EventId, events[1].EventId);
+ Assert.Equal(event3.EventId, events[2].EventId);
+ }
+
+ [Fact]
+ public async Task ReadEventsAsync_InStreamingMode_CompletesWhenStreamIsDisposed()
+ {
+ // Arrange
+ var cache = CreateMemoryCache();
+ var store = new DistributedCacheEventStreamStore(cache, new DistributedCacheEventStreamStoreOptions
+ {
+ PollingInterval = TimeSpan.FromMilliseconds(50)
+ });
+ var writer = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Streaming
+ }, CancellationToken);
+
+ // Write event to create a valid reader
+ var writtenEvent = await writer.WriteEventAsync(new SseItem(null), CancellationToken);
+ var reader = await store.GetStreamReaderAsync(writtenEvent.EventId!, CancellationToken);
+ Assert.NotNull(reader);
+
+ // Act - Start reading, then dispose the stream
+ var readTask = Task.Run(async () =>
+ {
+ await foreach (var evt in reader.ReadEventsAsync(CancellationToken))
+ {
+ }
+ }, CancellationToken);
+
+ // Dispose the writer - the reader should detect this and exit gracefully
+ await writer.DisposeAsync();
+
+ // Assert - The read should complete gracefully within timeout
+ using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(CancellationToken);
+ timeoutCts.CancelAfter(TimeSpan.FromSeconds(2));
+ await readTask.WaitAsync(timeoutCts.Token);
+ }
+
+ [Fact]
+ public async Task ReadEventsAsync_InStreamingMode_RespectsCancellation()
+ {
+ // Arrange
+ var cache = CreateMemoryCache();
+ var store = new DistributedCacheEventStreamStore(cache, new DistributedCacheEventStreamStoreOptions
+ {
+ PollingInterval = TimeSpan.FromMilliseconds(50)
+ });
+ var writer = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Streaming
+ }, CancellationToken);
+
+ // Write event to create a valid reader
+ var writtenEvent = await writer.WriteEventAsync(new SseItem(null), CancellationToken);
+ var reader = await store.GetStreamReaderAsync(writtenEvent.EventId!, CancellationToken);
+ Assert.NotNull(reader);
+
+ // Act - Start reading and then cancel
+ using var cts = CancellationTokenSource.CreateLinkedTokenSource(CancellationToken);
+ var events = new List>();
+ var messageReceivedTcs = new TaskCompletionSource();
+ var continueReadingTcs = new TaskCompletionSource();
+ OperationCanceledException? capturedException = null;
+
+ var readTask = Task.Run(async () =>
+ {
+ try
+ {
+ await foreach (var evt in reader.ReadEventsAsync(cts.Token))
+ {
+ events.Add(evt);
+ messageReceivedTcs.SetResult(true);
+ await continueReadingTcs.Task;
+ }
+ }
+ catch (OperationCanceledException ex)
+ {
+ capturedException = ex;
+ }
+ }, CancellationToken);
+
+ // Write a message for the reader to consume
+ await writer.WriteEventAsync(new SseItem(null), CancellationToken);
+
+ // Wait for the first message to be received
+ await messageReceivedTcs.Task;
+
+ // Cancel so that ReadEventsAsync throws before reading the next message
+ await cts.CancelAsync();
+
+ // Allow the message reader to continue
+ continueReadingTcs.SetResult(true);
+
+ // Wait for read task to complete
+ await readTask;
+
+ Assert.Single(events);
+ Assert.NotNull(capturedException);
+ }
+
+ [Fact]
+ public async Task ReadEventsAsync_RespectsModeSwitchFromStreamingToPolling()
+ {
+ // Arrange
+ var cache = CreateMemoryCache();
+ var store = new DistributedCacheEventStreamStore(cache, new DistributedCacheEventStreamStoreOptions
+ {
+ PollingInterval = TimeSpan.FromMilliseconds(50)
+ });
+ var writer = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Streaming
+ }, CancellationToken);
+
+ // Write an event to create a valid reader
+ var writtenEvent = await writer.WriteEventAsync(new SseItem(null), CancellationToken);
+ var reader = await store.GetStreamReaderAsync(writtenEvent.EventId!, CancellationToken);
+ Assert.NotNull(reader);
+
+ // Start reading in streaming mode (will wait for new events)
+ using var cts = CancellationTokenSource.CreateLinkedTokenSource(CancellationToken);
+ cts.CancelAfter(TimeSpan.FromSeconds(3));
+ var events = new List>();
+ var readCompleted = false;
+
+ var readTask = Task.Run(async () =>
+ {
+ await foreach (var evt in reader.ReadEventsAsync(cts.Token))
+ {
+ events.Add(evt);
+ }
+ readCompleted = true;
+ }, CancellationToken);
+
+ // Switch to polling mode - the reader should detect this and exit
+ await writer.SetModeAsync(SseEventStreamMode.Polling, CancellationToken);
+
+ // Assert - Read should complete within timeout after switching to polling mode
+ using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(CancellationToken);
+ timeoutCts.CancelAfter(TimeSpan.FromSeconds(1));
+ await readTask.WaitAsync(timeoutCts.Token);
+ Assert.True(readCompleted);
+ Assert.Empty(events); // No new events were written after the one we used to create the reader
+ }
+
+ [Fact]
+ public async Task ReadEventsAsync_PollingModeReturnsEventsThenCompletes()
+ {
+ // Arrange - Start in default mode, write some events, switch to polling, reader should return remaining events
+ var cache = CreateMemoryCache();
+ var store = new DistributedCacheEventStreamStore(cache, new DistributedCacheEventStreamStoreOptions
+ {
+ PollingInterval = TimeSpan.FromMilliseconds(50)
+ });
+ var writer = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Streaming
+ }, CancellationToken);
+
+ // Write initial event and create reader from sequence 0
+ var startEventId = DistributedCacheEventIdFormatter.Format("session-1", "stream-1", 0);
+
+ // Write events first
+ var event1 = await writer.WriteEventAsync(new SseItem(null), CancellationToken);
+ var event2 = await writer.WriteEventAsync(new SseItem(null), CancellationToken);
+
+ // Switch to polling mode
+ await writer.SetModeAsync(SseEventStreamMode.Polling, CancellationToken);
+
+ // Get reader
+ var reader = await store.GetStreamReaderAsync(startEventId, CancellationToken);
+ Assert.NotNull(reader);
+
+ // Act - Read should return events and complete immediately
+ var stopwatch = System.Diagnostics.Stopwatch.StartNew();
+ var events = new List>();
+ await foreach (var evt in reader.ReadEventsAsync(CancellationToken))
+ {
+ events.Add(evt);
+ }
+ stopwatch.Stop();
+
+ // Assert
+ Assert.Equal(2, events.Count);
+ Assert.Equal(event1.EventId, events[0].EventId);
+ Assert.Equal(event2.EventId, events[1].EventId);
+ Assert.True(stopwatch.ElapsedMilliseconds < 500, $"Should complete quickly, took {stopwatch.ElapsedMilliseconds}ms");
+ }
+
+ [Fact]
+ public async Task MultipleStreams_AreIsolated_EventsDoNotLeakBetweenStreams()
+ {
+ // Arrange
+ var cache = CreateMemoryCache();
+ var store = new DistributedCacheEventStreamStore(cache);
+
+ // Create two streams with different session/stream IDs
+ var writer1 = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Polling
+ }, CancellationToken);
+
+ var writer2 = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-2",
+ StreamId = "stream-2",
+ Mode = SseEventStreamMode.Polling
+ }, CancellationToken);
+
+ // Write events to each stream
+ var event1 = await writer1.WriteEventAsync(new SseItem(null, "event-from-stream1"), CancellationToken);
+ var event2 = await writer2.WriteEventAsync(new SseItem(null, "event-from-stream2"), CancellationToken);
+
+ // Create readers for each stream from sequence 0
+ var start1 = DistributedCacheEventIdFormatter.Format("session-1", "stream-1", 0);
+ var start2 = DistributedCacheEventIdFormatter.Format("session-2", "stream-2", 0);
+
+ var reader1 = await store.GetStreamReaderAsync(start1, CancellationToken);
+ var reader2 = await store.GetStreamReaderAsync(start2, CancellationToken);
+ Assert.NotNull(reader1);
+ Assert.NotNull(reader2);
+
+ // Act - Read from each reader
+ var events1 = new List>();
+ await foreach (var evt in reader1.ReadEventsAsync(CancellationToken))
+ {
+ events1.Add(evt);
+ }
+
+ var events2 = new List>();
+ await foreach (var evt in reader2.ReadEventsAsync(CancellationToken))
+ {
+ events2.Add(evt);
+ }
+
+ // Assert - Each reader should only see its own stream's events
+ Assert.Single(events1);
+ Assert.Equal("event-from-stream1", events1[0].EventType);
+ Assert.Equal(event1.EventId, events1[0].EventId);
+
+ Assert.Single(events2);
+ Assert.Equal("event-from-stream2", events2[0].EventType);
+ Assert.Equal(event2.EventId, events2[0].EventId);
+ }
+
+ [Fact]
+ public async Task MultipleStreams_SameSession_DifferentStreamIds_AreIsolated()
+ {
+ // Arrange
+ var cache = CreateMemoryCache();
+ var store = new DistributedCacheEventStreamStore(cache);
+
+ // Create two streams with same session but different stream IDs
+ var writer1 = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "shared-session",
+ StreamId = "stream-A",
+ Mode = SseEventStreamMode.Polling
+ }, CancellationToken);
+
+ var writer2 = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "shared-session",
+ StreamId = "stream-B",
+ Mode = SseEventStreamMode.Polling
+ }, CancellationToken);
+
+ // Write events to each stream
+ await writer1.WriteEventAsync(new SseItem(null, "from-A"), CancellationToken);
+ await writer2.WriteEventAsync(new SseItem(null, "from-B"), CancellationToken);
+
+ // Create readers from sequence 0
+ var reader1 = await store.GetStreamReaderAsync(DistributedCacheEventIdFormatter.Format("shared-session", "stream-A", 0), CancellationToken);
+ var reader2 = await store.GetStreamReaderAsync(DistributedCacheEventIdFormatter.Format("shared-session", "stream-B", 0), CancellationToken);
+ Assert.NotNull(reader1);
+ Assert.NotNull(reader2);
+
+ // Act
+ var events1 = new List>();
+ await foreach (var evt in reader1.ReadEventsAsync(CancellationToken))
+ {
+ events1.Add(evt);
+ }
+
+ var events2 = new List>();
+ await foreach (var evt in reader2.ReadEventsAsync(CancellationToken))
+ {
+ events2.Add(evt);
+ }
+
+ // Assert
+ Assert.Single(events1);
+ Assert.Equal("from-A", events1[0].EventType);
+
+ Assert.Single(events2);
+ Assert.Equal("from-B", events2[0].EventType);
+ }
+
+ [Fact]
+ public async Task EventIds_AreGloballyUnique_AcrossStreams()
+ {
+ // Arrange
+ var cache = CreateMemoryCache();
+ var store = new DistributedCacheEventStreamStore(cache);
+
+ var writer1 = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Polling
+ }, CancellationToken);
+
+ var writer2 = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-2",
+ StreamId = "stream-2",
+ Mode = SseEventStreamMode.Polling
+ }, CancellationToken);
+
+ // Act - Write events to each stream
+ var event1a = await writer1.WriteEventAsync(new SseItem(null), CancellationToken);
+ var event1b = await writer1.WriteEventAsync(new SseItem(null), CancellationToken);
+ var event2a = await writer2.WriteEventAsync(new SseItem(null), CancellationToken);
+ var event2b = await writer2.WriteEventAsync(new SseItem(null), CancellationToken);
+
+ // Assert - All event IDs should be unique
+ var allEventIds = new[] { event1a.EventId, event1b.EventId, event2a.EventId, event2b.EventId };
+ Assert.Equal(4, allEventIds.Distinct().Count());
+ }
+
+ [Fact]
+ public async Task WriteEventAsync_UsesConfiguredSlidingExpiration()
+ {
+ // Arrange
+ var mockCache = new TestDistributedCache();
+ var customOptions = new DistributedCacheEventStreamStoreOptions
+ {
+ EventSlidingExpiration = TimeSpan.FromMinutes(30)
+ };
+ var store = new DistributedCacheEventStreamStore(mockCache, customOptions);
+ var writer = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Polling
+ }, CancellationToken);
+
+ mockCache.SetCalls.Clear();
+
+ // Act
+ await writer.WriteEventAsync(new SseItem(null), CancellationToken);
+
+ // Assert - Event should be written with the configured sliding expiration
+ Assert.Contains(mockCache.SetCalls, call =>
+ call.Key.Contains("event:") &&
+ call.Options.SlidingExpiration == TimeSpan.FromMinutes(30));
+ }
+
+ [Fact]
+ public async Task WriteEventAsync_UsesConfiguredAbsoluteExpiration()
+ {
+ // Arrange
+ var mockCache = new TestDistributedCache();
+ var customOptions = new DistributedCacheEventStreamStoreOptions
+ {
+ EventAbsoluteExpiration = TimeSpan.FromHours(6)
+ };
+ var store = new DistributedCacheEventStreamStore(mockCache, customOptions);
+ var writer = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Polling
+ }, CancellationToken);
+
+ mockCache.SetCalls.Clear();
+
+ // Act
+ await writer.WriteEventAsync(new SseItem(null), CancellationToken);
+
+ // Assert - Event should be written with the configured absolute expiration (relative to now)
+ var eventCall = mockCache.SetCalls.FirstOrDefault(call => call.Key.Contains("event:"));
+ Assert.NotNull(eventCall.Key);
+ Assert.NotNull(eventCall.Options.AbsoluteExpirationRelativeToNow);
+ Assert.Equal(TimeSpan.FromHours(6), eventCall.Options.AbsoluteExpirationRelativeToNow);
+ }
+
+ [Fact]
+ public async Task WriteEventAsync_UsesConfiguredMetadataExpiration()
+ {
+ // Arrange - Metadata is written when events are written
+ var mockCache = new TestDistributedCache();
+ var customOptions = new DistributedCacheEventStreamStoreOptions
+ {
+ MetadataSlidingExpiration = TimeSpan.FromMinutes(45),
+ MetadataAbsoluteExpiration = TimeSpan.FromHours(12)
+ };
+ var store = new DistributedCacheEventStreamStore(mockCache, customOptions);
+ var writer = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Polling
+ }, CancellationToken);
+
+ // Act - Write an event, which also updates metadata
+ await writer.WriteEventAsync(new SseItem(null), CancellationToken);
+
+ // Assert
+ var metadataCall = mockCache.SetCalls.FirstOrDefault(call => call.Key.Contains("meta:"));
+ Assert.NotNull(metadataCall.Key);
+ Assert.Equal(TimeSpan.FromMinutes(45), metadataCall.Options.SlidingExpiration);
+ Assert.Equal(TimeSpan.FromHours(12), metadataCall.Options.AbsoluteExpirationRelativeToNow);
+ }
+
+ [Fact]
+ public void DefaultOptions_HaveReasonableDefaults()
+ {
+ // Arrange & Act
+ var options = new DistributedCacheEventStreamStoreOptions();
+
+ // Assert - Check that defaults are set reasonably
+ Assert.True(options.PollingInterval >= TimeSpan.FromMilliseconds(50), "Polling interval should be at least 50ms");
+ Assert.True(options.EventSlidingExpiration > TimeSpan.Zero, "Event sliding expiration should be positive");
+ Assert.True(options.EventAbsoluteExpiration > TimeSpan.Zero, "Event absolute expiration should be positive");
+ Assert.True(options.MetadataSlidingExpiration > TimeSpan.Zero, "Metadata sliding expiration should be positive");
+ Assert.True(options.MetadataAbsoluteExpiration > TimeSpan.Zero, "Metadata absolute expiration should be positive");
+ }
+
+ [Fact]
+ public async Task ReadEventsAsync_ThrowsMcpException_WhenMetadataExpires()
+ {
+ // Arrange - Use a cache that allows us to simulate metadata expiration
+ var trackingCache = new TestDistributedCache();
+ var customOptions = new DistributedCacheEventStreamStoreOptions
+ {
+ PollingInterval = TimeSpan.FromMilliseconds(10) // Fast polling to detect the bug quickly
+ };
+ var store = new DistributedCacheEventStreamStore(trackingCache, customOptions);
+
+ // Create a stream and write an event
+ var writer = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Streaming // Non-polling mode to trigger the waiting loop
+ }, CancellationToken);
+
+ var item = new SseItem(new JsonRpcNotification { Method = "test" });
+ var writtenItem = await writer.WriteEventAsync(item, CancellationToken);
+
+ // Get a reader starting after the first event (so it will wait for more events)
+ var reader = await store.GetStreamReaderAsync(writtenItem.EventId!, CancellationToken);
+ Assert.NotNull(reader);
+
+ // Now simulate metadata expiration
+ trackingCache.ExpireMetadata();
+
+ // Act & Assert - Reader should throw McpException when metadata expires
+ var exception = await Assert.ThrowsAsync(async () =>
+ {
+ await foreach (var evt in reader.ReadEventsAsync(CancellationToken))
+ {
+ // Should not yield any events before throwing
+ }
+ });
+
+ Assert.Contains("session-1", exception.Message);
+ Assert.Contains("stream-1", exception.Message);
+ Assert.Contains("metadata", exception.Message, StringComparison.OrdinalIgnoreCase);
+ }
+
+ [Fact]
+ public async Task ReadEventsAsync_ThrowsMcpException_WhenEventExpires()
+ {
+ // Arrange - Use a cache that allows us to simulate event expiration
+ var trackingCache = new TestDistributedCache();
+ var store = new DistributedCacheEventStreamStore(trackingCache);
+
+ // Create a stream and write multiple events
+ var writer = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Polling
+ }, CancellationToken);
+
+ var event1 = await writer.WriteEventAsync(new SseItem(new JsonRpcNotification { Method = "method1" }), CancellationToken);
+ var event2 = await writer.WriteEventAsync(new SseItem(new JsonRpcNotification { Method = "method2" }), CancellationToken);
+ var event3 = await writer.WriteEventAsync(new SseItem(new JsonRpcNotification { Method = "method3" }), CancellationToken);
+
+ // Create a reader starting from before the first event
+ var startEventId = DistributedCacheEventIdFormatter.Format("session-1", "stream-1", 0);
+ var reader = await store.GetStreamReaderAsync(startEventId, CancellationToken);
+ Assert.NotNull(reader);
+
+ // Simulate event2 expiring from the cache
+ trackingCache.ExpireEvent(event2.EventId!);
+
+ // Act & Assert - Reader should throw McpException when an event is missing
+ var exception = await Assert.ThrowsAsync(async () =>
+ {
+ var events = new List>();
+ await foreach (var evt in reader.ReadEventsAsync(CancellationToken))
+ {
+ events.Add(evt);
+ }
+ });
+
+ Assert.Contains(event2.EventId!, exception.Message);
+ Assert.Contains("not found", exception.Message, StringComparison.OrdinalIgnoreCase);
+ }
+
+ [Fact]
+ public async Task ReadEventsAsync_DoesNotReadMetadata_InPollingMode()
+ {
+ // Arrange - Use a tracking cache to count metadata reads
+ var trackingCache = new TestDistributedCache();
+ var customOptions = new DistributedCacheEventStreamStoreOptions
+ {
+ PollingInterval = TimeSpan.FromMilliseconds(10)
+ };
+ var store = new DistributedCacheEventStreamStore(trackingCache, customOptions);
+
+ // Create a stream in POLLING mode - this allows the reader to exit after reading available events
+ var writer = await store.CreateStreamAsync(new SseEventStreamOptions
+ {
+ SessionId = "session-1",
+ StreamId = "stream-1",
+ Mode = SseEventStreamMode.Polling
+ }, CancellationToken);
+
+ var item1 = new SseItem(new JsonRpcNotification { Method = "test1" });
+ var item2 = new SseItem(new JsonRpcNotification { Method = "test2" });
+ await writer.WriteEventAsync(item1, CancellationToken);
+ await writer.WriteEventAsync(item2, CancellationToken);
+
+ // Get a reader starting before all events (use a fake event ID at sequence 0)
+ var zeroSequenceEventId = DistributedCacheEventIdFormatter.Format("session-1", "stream-1", 0);
+ var reader = await store.GetStreamReaderAsync(zeroSequenceEventId, CancellationToken);
+ Assert.NotNull(reader);
+
+ // GetStreamReaderAsync should have read metadata exactly once
+ Assert.Equal(1, trackingCache.MetadataReadCount);
+
+ // Act - Read all events
+ var events = new List>();
+ await foreach (var evt in reader.ReadEventsAsync(CancellationToken))
+ {
+ events.Add(evt);
+ }
+
+ // Assert - In polling mode, the reader should:
+ // 1. Use initial metadata from GetStreamReaderAsync (no additional read needed)
+ // 2. Read all available events (2 events)
+ // 3. Exit immediately because mode is Polling
+ //
+ // Metadata read count should remain at 1 (only the initial read from GetStreamReaderAsync)
+ Assert.Equal(2, events.Count);
+ Assert.Equal(1, trackingCache.MetadataReadCount);
+ }
+
+ [Fact]
+ public void EventIdFormatter_Format_CreatesValidEventId()
+ {
+ // Act
+ var eventId = DistributedCacheEventIdFormatter.Format("session-1", "stream-1", 42);
+
+ // Assert
+ Assert.NotNull(eventId);
+ Assert.NotEmpty(eventId);
+ Assert.Contains(":", eventId); // Should contain separators
+ }
+
+ [Fact]
+ public void EventIdFormatter_TryParse_RoundTripsSuccessfully()
+ {
+ // Arrange
+ var originalSessionId = "my-session-id";
+ var originalStreamId = "my-stream-id";
+ var originalSequence = 12345L;
+
+ // Act
+ var eventId = DistributedCacheEventIdFormatter.Format(originalSessionId, originalStreamId, originalSequence);
+ var parsed = DistributedCacheEventIdFormatter.TryParse(eventId, out var sessionId, out var streamId, out var sequence);
+
+ // Assert
+ Assert.True(parsed);
+ Assert.Equal(originalSessionId, sessionId);
+ Assert.Equal(originalStreamId, streamId);
+ Assert.Equal(originalSequence, sequence);
+ }
+
+ [Fact]
+ public void EventIdFormatter_TryParse_HandlesSpecialCharactersInSessionId()
+ {
+ // Arrange - Session IDs can contain any visible ASCII character per MCP spec
+ var originalSessionId = "session:with:colons:and|pipes";
+ var originalStreamId = "stream-1";
+ var originalSequence = 1L;
+
+ // Act
+ var eventId = DistributedCacheEventIdFormatter.Format(originalSessionId, originalStreamId, originalSequence);
+ var parsed = DistributedCacheEventIdFormatter.TryParse(eventId, out var sessionId, out var streamId, out var sequence);
+
+ // Assert
+ Assert.True(parsed);
+ Assert.Equal(originalSessionId, sessionId);
+ Assert.Equal(originalStreamId, streamId);
+ Assert.Equal(originalSequence, sequence);
+ }
+
+ [Fact]
+ public void EventIdFormatter_TryParse_HandlesSpecialCharactersInStreamId()
+ {
+ // Arrange
+ var originalSessionId = "session-1";
+ var originalStreamId = "stream:with:colons:and|special!chars@#$%";
+ var originalSequence = 1L;
+
+ // Act
+ var eventId = DistributedCacheEventIdFormatter.Format(originalSessionId, originalStreamId, originalSequence);
+ var parsed = DistributedCacheEventIdFormatter.TryParse(eventId, out var sessionId, out var streamId, out var sequence);
+
+ // Assert
+ Assert.True(parsed);
+ Assert.Equal(originalSessionId, sessionId);
+ Assert.Equal(originalStreamId, streamId);
+ Assert.Equal(originalSequence, sequence);
+ }
+
+ [Fact]
+ public void EventIdFormatter_TryParse_HandlesUnicodeCharacters()
+ {
+ // Arrange
+ var originalSessionId = "session-日本語-émojis-🎉";
+ var originalStreamId = "stream-中文-العربية";
+ var originalSequence = 999L;
+
+ // Act
+ var eventId = DistributedCacheEventIdFormatter.Format(originalSessionId, originalStreamId, originalSequence);
+ var parsed = DistributedCacheEventIdFormatter.TryParse(eventId, out var sessionId, out var streamId, out var sequence);
+
+ // Assert
+ Assert.True(parsed);
+ Assert.Equal(originalSessionId, sessionId);
+ Assert.Equal(originalStreamId, streamId);
+ Assert.Equal(originalSequence, sequence);
+ }
+
+ [Fact]
+ public void EventIdFormatter_TryParse_HandlesZeroSequence()
+ {
+ // Act
+ var eventId = DistributedCacheEventIdFormatter.Format("session", "stream", 0);
+ var parsed = DistributedCacheEventIdFormatter.TryParse(eventId, out _, out _, out var sequence);
+
+ // Assert
+ Assert.True(parsed);
+ Assert.Equal(0, sequence);
+ }
+
+ [Fact]
+ public void EventIdFormatter_TryParse_HandlesLargeSequence()
+ {
+ // Act
+ var eventId = DistributedCacheEventIdFormatter.Format("session", "stream", long.MaxValue);
+ var parsed = DistributedCacheEventIdFormatter.TryParse(eventId, out _, out _, out var sequence);
+
+ // Assert
+ Assert.True(parsed);
+ Assert.Equal(long.MaxValue, sequence);
+ }
+
+ [Fact]
+ public void EventIdFormatter_TryParse_ReturnsFalse_ForEmptyString()
+ {
+ // Act
+ var parsed = DistributedCacheEventIdFormatter.TryParse("", out var sessionId, out var streamId, out var sequence);
+
+ // Assert
+ Assert.False(parsed);
+ Assert.Equal(string.Empty, sessionId);
+ Assert.Equal(string.Empty, streamId);
+ Assert.Equal(0, sequence);
+ }
+
+ [Fact]
+ public void EventIdFormatter_TryParse_ReturnsFalse_ForInvalidFormat()
+ {
+ // Act & Assert - Various invalid formats
+ Assert.False(DistributedCacheEventIdFormatter.TryParse("no-separators", out _, out _, out _));
+ Assert.False(DistributedCacheEventIdFormatter.TryParse("only:one", out _, out _, out _));
+ Assert.False(DistributedCacheEventIdFormatter.TryParse("too:many:parts:here", out _, out _, out _));
+ }
+
+ [Fact]
+ public void EventIdFormatter_TryParse_ReturnsFalse_ForInvalidBase64()
+ {
+ // Act - Invalid base64 in first part
+ var parsed = DistributedCacheEventIdFormatter.TryParse("!!!invalid!!!:c3RyZWFt:1", out _, out _, out _);
+
+ // Assert
+ Assert.False(parsed);
+ }
+
+ [Fact]
+ public void EventIdFormatter_TryParse_ReturnsFalse_ForNonNumericSequence()
+ {
+ // Arrange - Valid base64 but non-numeric sequence
+ var sessionBase64 = Convert.ToBase64String(System.Text.Encoding.UTF8.GetBytes("session"));
+ var streamBase64 = Convert.ToBase64String(System.Text.Encoding.UTF8.GetBytes("stream"));
+ var invalidEventId = $"{sessionBase64}:{streamBase64}:not-a-number";
+
+ // Act
+ var parsed = DistributedCacheEventIdFormatter.TryParse(invalidEventId, out _, out _, out _);
+
+ // Assert
+ Assert.False(parsed);
+ }
+
+ ///
+ /// A distributed cache that tracks all operations for verification in tests.
+ /// Supports tracking Set calls, counting metadata reads, and simulating metadata/event expiration.
+ ///
+ private sealed class TestDistributedCache : IDistributedCache
+ {
+ private readonly MemoryDistributedCache _innerCache = new(Options.Create(new MemoryDistributedCacheOptions()));
+ private int _metadataReadCount;
+ private bool _metadataExpired;
+ private readonly HashSet _expiredEventIds = [];
+
+ public List<(string Key, DistributedCacheEntryOptions Options)> SetCalls { get; } = [];
+ public int MetadataReadCount => _metadataReadCount;
+
+ public void ExpireMetadata() => _metadataExpired = true;
+ public void ExpireEvent(string eventId) => _expiredEventIds.Add(eventId);
+
+ public byte[]? Get(string key)
+ {
+ if (key.Contains("meta:"))
+ {
+ Interlocked.Increment(ref _metadataReadCount);
+ if (_metadataExpired)
+ {
+ return null;
+ }
+ }
+ if (IsExpiredEvent(key))
+ {
+ return null;
+ }
+ return _innerCache.Get(key);
+ }
+
+ public Task GetAsync(string key, CancellationToken token = default)
+ {
+ if (key.Contains("meta:"))
+ {
+ Interlocked.Increment(ref _metadataReadCount);
+ if (_metadataExpired)
+ {
+ return Task.FromResult(null);
+ }
+ }
+ if (IsExpiredEvent(key))
+ {
+ return Task.FromResult(null);
+ }
+ return _innerCache.GetAsync(key, token);
+ }
+
+ private bool IsExpiredEvent(string key)
+ {
+ // Cache key format is "mcp:sse:event:{eventId}"
+ foreach (var expiredEventId in _expiredEventIds)
+ {
+ if (key.EndsWith(expiredEventId))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public void Refresh(string key) => _innerCache.Refresh(key);
+ public Task RefreshAsync(string key, CancellationToken token = default) => _innerCache.RefreshAsync(key, token);
+ public void Remove(string key) => _innerCache.Remove(key);
+ public Task RemoveAsync(string key, CancellationToken token = default) => _innerCache.RemoveAsync(key, token);
+
+ public void Set(string key, byte[] value, DistributedCacheEntryOptions options)
+ {
+ SetCalls.Add((key, options));
+ _innerCache.Set(key, value, options);
+ }
+
+ public Task SetAsync(string key, byte[] value, DistributedCacheEntryOptions options, CancellationToken token = default)
+ {
+ SetCalls.Add((key, options));
+ return _innerCache.SetAsync(key, value, options, token);
+ }
+ }
+}