Skip to content

Commit 9ddf34d

Browse files
test: add unit tests for SEP-1699 SSE retry field handling
Client tests: - Server-provided retry value used for reconnection delay - Fallback to exponential backoff when no retry value - Reconnection on graceful stream close with Last-Event-ID Server tests: - Send priming event with retry field when retryInterval configured - No priming event when retryInterval not configured - Event ID included in priming event for resumability
1 parent 8c741be commit 9ddf34d

File tree

3 files changed

+288
-7
lines changed

3 files changed

+288
-7
lines changed

src/client/streamableHttp.test.ts

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1010,6 +1010,148 @@ describe('StreamableHTTPClientTransport', () => {
10101010
});
10111011
});
10121012

1013+
describe('SSE retry field handling (SEP-1699)', () => {
1014+
beforeEach(() => {
1015+
vi.useFakeTimers();
1016+
(global.fetch as Mock).mockReset();
1017+
});
1018+
afterEach(() => vi.useRealTimers());
1019+
1020+
it('should use server-provided retry value for reconnection delay', async () => {
1021+
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
1022+
reconnectionOptions: {
1023+
initialReconnectionDelay: 100,
1024+
maxReconnectionDelay: 5000,
1025+
reconnectionDelayGrowFactor: 2,
1026+
maxRetries: 3
1027+
}
1028+
});
1029+
1030+
// Create a stream that sends a retry field
1031+
const encoder = new TextEncoder();
1032+
const stream = new ReadableStream({
1033+
start(controller) {
1034+
// Send SSE event with retry field
1035+
const event =
1036+
'retry: 3000\nevent: message\nid: evt-1\ndata: {"jsonrpc": "2.0", "method": "notification", "params": {}}\n\n';
1037+
controller.enqueue(encoder.encode(event));
1038+
// Close stream to trigger reconnection
1039+
controller.close();
1040+
}
1041+
});
1042+
1043+
const fetchMock = global.fetch as Mock;
1044+
fetchMock.mockResolvedValueOnce({
1045+
ok: true,
1046+
status: 200,
1047+
headers: new Headers({ 'content-type': 'text/event-stream' }),
1048+
body: stream
1049+
});
1050+
1051+
// Second request for reconnection
1052+
fetchMock.mockResolvedValueOnce({
1053+
ok: true,
1054+
status: 200,
1055+
headers: new Headers({ 'content-type': 'text/event-stream' }),
1056+
body: new ReadableStream()
1057+
});
1058+
1059+
await transport.start();
1060+
await transport['_startOrAuthSse']({});
1061+
1062+
// Wait for stream to close and reconnection to be scheduled
1063+
await vi.advanceTimersByTimeAsync(100);
1064+
1065+
// Verify the server retry value was captured
1066+
const transportInternal = transport as unknown as { _serverRetryMs?: number };
1067+
expect(transportInternal._serverRetryMs).toBe(3000);
1068+
1069+
// Verify the delay calculation uses server retry value
1070+
const getDelay = transport['_getNextReconnectionDelay'].bind(transport);
1071+
expect(getDelay(0)).toBe(3000); // Should use server value, not 100ms initial
1072+
expect(getDelay(5)).toBe(3000); // Should still use server value for any attempt
1073+
});
1074+
1075+
it('should fall back to exponential backoff when no server retry value', () => {
1076+
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
1077+
reconnectionOptions: {
1078+
initialReconnectionDelay: 100,
1079+
maxReconnectionDelay: 5000,
1080+
reconnectionDelayGrowFactor: 2,
1081+
maxRetries: 3
1082+
}
1083+
});
1084+
1085+
// Without any SSE stream, _serverRetryMs should be undefined
1086+
const transportInternal = transport as unknown as { _serverRetryMs?: number };
1087+
expect(transportInternal._serverRetryMs).toBeUndefined();
1088+
1089+
// Should use exponential backoff
1090+
const getDelay = transport['_getNextReconnectionDelay'].bind(transport);
1091+
expect(getDelay(0)).toBe(100); // 100 * 2^0
1092+
expect(getDelay(1)).toBe(200); // 100 * 2^1
1093+
expect(getDelay(2)).toBe(400); // 100 * 2^2
1094+
expect(getDelay(10)).toBe(5000); // capped at max
1095+
});
1096+
1097+
it('should reconnect on graceful stream close', async () => {
1098+
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
1099+
reconnectionOptions: {
1100+
initialReconnectionDelay: 10,
1101+
maxReconnectionDelay: 1000,
1102+
reconnectionDelayGrowFactor: 1,
1103+
maxRetries: 1
1104+
}
1105+
});
1106+
1107+
// Create a stream that closes gracefully after sending an event with ID
1108+
const encoder = new TextEncoder();
1109+
const stream = new ReadableStream({
1110+
start(controller) {
1111+
// Send priming event with ID and retry field
1112+
const event = 'id: evt-1\nretry: 100\ndata: \n\n';
1113+
controller.enqueue(encoder.encode(event));
1114+
// Graceful close
1115+
controller.close();
1116+
}
1117+
});
1118+
1119+
const fetchMock = global.fetch as Mock;
1120+
fetchMock.mockResolvedValueOnce({
1121+
ok: true,
1122+
status: 200,
1123+
headers: new Headers({ 'content-type': 'text/event-stream' }),
1124+
body: stream
1125+
});
1126+
1127+
// Second request for reconnection
1128+
fetchMock.mockResolvedValueOnce({
1129+
ok: true,
1130+
status: 200,
1131+
headers: new Headers({ 'content-type': 'text/event-stream' }),
1132+
body: new ReadableStream()
1133+
});
1134+
1135+
await transport.start();
1136+
await transport['_startOrAuthSse']({});
1137+
1138+
// Wait for stream to process and close
1139+
await vi.advanceTimersByTimeAsync(50);
1140+
1141+
// Wait for reconnection delay (100ms from retry field)
1142+
await vi.advanceTimersByTimeAsync(150);
1143+
1144+
// Should have attempted reconnection
1145+
expect(fetchMock).toHaveBeenCalledTimes(2);
1146+
expect(fetchMock.mock.calls[0][1]?.method).toBe('GET');
1147+
expect(fetchMock.mock.calls[1][1]?.method).toBe('GET');
1148+
1149+
// Second call should include Last-Event-ID
1150+
const secondCallHeaders = fetchMock.mock.calls[1][1]?.headers;
1151+
expect(secondCallHeaders?.get('last-event-id')).toBe('evt-1');
1152+
});
1153+
});
1154+
10131155
describe('prevent infinite recursion when server returns 401 after successful auth', () => {
10141156
it('should throw error when server returns 401 after successful auth', async () => {
10151157
const message: JSONRPCMessage = {

src/client/streamableHttp.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -320,12 +320,14 @@ export class StreamableHTTPClientTransport implements Transport {
320320
// Create a pipeline: binary stream -> text decoder -> SSE parser
321321
const reader = stream
322322
.pipeThrough(new TextDecoderStream() as ReadableWritablePair<string, Uint8Array>)
323-
.pipeThrough(new EventSourceParserStream({
324-
onRetry: (retryMs: number) => {
325-
// SEP-1699: Capture server-provided retry value for reconnection timing
326-
this._serverRetryMs = retryMs;
327-
}
328-
}))
323+
.pipeThrough(
324+
new EventSourceParserStream({
325+
onRetry: (retryMs: number) => {
326+
// SEP-1699: Capture server-provided retry value for reconnection timing
327+
this._serverRetryMs = retryMs;
328+
}
329+
})
330+
)
329331
.getReader();
330332

331333
while (true) {

src/server/streamableHttp.test.ts

Lines changed: 138 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ interface TestServerConfig {
3131
eventStore?: EventStore;
3232
onsessioninitialized?: (sessionId: string) => void | Promise<void>;
3333
onsessionclosed?: (sessionId: string) => void | Promise<void>;
34+
retryInterval?: number;
3435
}
3536

3637
/**
@@ -58,7 +59,8 @@ async function createTestServer(config: TestServerConfig = { sessionIdGenerator:
5859
enableJsonResponse: config.enableJsonResponse ?? false,
5960
eventStore: config.eventStore,
6061
onsessioninitialized: config.onsessioninitialized,
61-
onsessionclosed: config.onsessionclosed
62+
onsessionclosed: config.onsessionclosed,
63+
retryInterval: config.retryInterval
6264
});
6365

6466
await mcpServer.connect(transport);
@@ -1516,6 +1518,141 @@ describe('StreamableHTTPServerTransport in stateless mode', () => {
15161518
});
15171519
});
15181520

1521+
// Test SSE priming events (SEP-1699)
1522+
describe('StreamableHTTPServerTransport SSE priming events (SEP-1699)', () => {
1523+
let server: Server;
1524+
let transport: StreamableHTTPServerTransport;
1525+
let baseUrl: URL;
1526+
let sessionId: string;
1527+
1528+
afterEach(async () => {
1529+
if (server && transport) {
1530+
await stopTestServer({ server, transport });
1531+
}
1532+
});
1533+
1534+
it('should send priming event with retry field when retryInterval is configured', async () => {
1535+
const result = await createTestServer({
1536+
sessionIdGenerator: () => randomUUID(),
1537+
retryInterval: 5000
1538+
});
1539+
server = result.server;
1540+
transport = result.transport;
1541+
baseUrl = result.baseUrl;
1542+
1543+
// Initialize to get session ID
1544+
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
1545+
sessionId = initResponse.headers.get('mcp-session-id') as string;
1546+
expect(sessionId).toBeDefined();
1547+
1548+
// Open a standalone SSE stream
1549+
const sseResponse = await fetch(baseUrl, {
1550+
method: 'GET',
1551+
headers: {
1552+
Accept: 'text/event-stream',
1553+
'mcp-session-id': sessionId,
1554+
'mcp-protocol-version': '2025-03-26'
1555+
}
1556+
});
1557+
1558+
expect(sseResponse.status).toBe(200);
1559+
expect(sseResponse.headers.get('content-type')).toBe('text/event-stream');
1560+
1561+
// Read the priming event
1562+
const reader = sseResponse.body?.getReader();
1563+
const { value } = await reader!.read();
1564+
const text = new TextDecoder().decode(value);
1565+
1566+
// Verify priming event has id and retry field
1567+
expect(text).toContain('id: ');
1568+
expect(text).toContain('retry: 5000');
1569+
expect(text).toContain('data: ');
1570+
});
1571+
1572+
it('should not send priming event when retryInterval is not configured', async () => {
1573+
const result = await createTestServer({
1574+
sessionIdGenerator: () => randomUUID()
1575+
// No retryInterval
1576+
});
1577+
server = result.server;
1578+
transport = result.transport;
1579+
baseUrl = result.baseUrl;
1580+
1581+
// Initialize to get session ID
1582+
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
1583+
sessionId = initResponse.headers.get('mcp-session-id') as string;
1584+
expect(sessionId).toBeDefined();
1585+
1586+
// Open a standalone SSE stream
1587+
const sseResponse = await fetch(baseUrl, {
1588+
method: 'GET',
1589+
headers: {
1590+
Accept: 'text/event-stream',
1591+
'mcp-session-id': sessionId,
1592+
'mcp-protocol-version': '2025-03-26'
1593+
}
1594+
});
1595+
1596+
expect(sseResponse.status).toBe(200);
1597+
1598+
// Send a notification to get some output
1599+
const notification: JSONRPCMessage = {
1600+
jsonrpc: '2.0',
1601+
method: 'notifications/message',
1602+
params: { level: 'info', data: 'Test' }
1603+
};
1604+
await transport.send(notification);
1605+
1606+
// Read from stream
1607+
const reader = sseResponse.body?.getReader();
1608+
const { value } = await reader!.read();
1609+
const text = new TextDecoder().decode(value);
1610+
1611+
// Should NOT have retry field (since retryInterval not configured)
1612+
expect(text).not.toContain('retry:');
1613+
// Should have the notification
1614+
expect(text).toContain('Test');
1615+
});
1616+
1617+
it('should include event ID in priming event for resumability', async () => {
1618+
const result = await createTestServer({
1619+
sessionIdGenerator: () => randomUUID(),
1620+
retryInterval: 2000
1621+
});
1622+
server = result.server;
1623+
transport = result.transport;
1624+
baseUrl = result.baseUrl;
1625+
1626+
// Initialize to get session ID
1627+
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
1628+
sessionId = initResponse.headers.get('mcp-session-id') as string;
1629+
1630+
// Open SSE stream
1631+
const sseResponse = await fetch(baseUrl, {
1632+
method: 'GET',
1633+
headers: {
1634+
Accept: 'text/event-stream',
1635+
'mcp-session-id': sessionId,
1636+
'mcp-protocol-version': '2025-03-26'
1637+
}
1638+
});
1639+
1640+
// Read priming event
1641+
const reader = sseResponse.body?.getReader();
1642+
const { value } = await reader!.read();
1643+
const text = new TextDecoder().decode(value);
1644+
1645+
// Extract event ID from priming event
1646+
const idMatch = text.match(/id: ([^\n]+)/);
1647+
expect(idMatch).toBeTruthy();
1648+
const eventId = idMatch![1];
1649+
1650+
// Event ID should be present and non-empty
1651+
expect(eventId).toBeTruthy();
1652+
expect(eventId.length).toBeGreaterThan(0);
1653+
});
1654+
});
1655+
15191656
// Test onsessionclosed callback
15201657
describe('StreamableHTTPServerTransport onsessionclosed callback', () => {
15211658
it('should call onsessionclosed callback when session is closed via DELETE', async () => {

0 commit comments

Comments
 (0)