Skip to content

Commit 449589c

Browse files
feat: add closeSseStream API for POST SSE polling (SEP-1699)
- Remove priming from GET streams (only POST streams per spec) - Replace closeStandaloneSseStream() with closeSseStream(requestId) - Add validation that retryInterval requires eventStore - Add tests for POST SSE priming events and stream closure - Restore original resumability tests without priming expectations
1 parent b4b7aa1 commit 449589c

File tree

2 files changed

+184
-96
lines changed

2 files changed

+184
-96
lines changed

src/server/streamableHttp.test.ts

Lines changed: 157 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ describe('StreamableHTTPServerTransport', () => {
331331

332332
// Parse the SSE event
333333
const eventLines = text.split('\n');
334-
const dataLine = eventLines.find(line => line.startsWith('data:') && line.substring(5).trim() !== '');
334+
const dataLine = eventLines.find(line => line.startsWith('data:'));
335335
expect(dataLine).toBeDefined();
336336

337337
const eventData = JSON.parse(dataLine!.substring(5));
@@ -369,7 +369,7 @@ describe('StreamableHTTPServerTransport', () => {
369369

370370
const text = await readSSEEvent(response);
371371
const eventLines = text.split('\n');
372-
const dataLine = eventLines.find(line => line.startsWith('data:') && line.substring(5).trim() !== '');
372+
const dataLine = eventLines.find(line => line.startsWith('data:'));
373373
expect(dataLine).toBeDefined();
374374

375375
const eventData = JSON.parse(dataLine!.substring(5));
@@ -424,7 +424,7 @@ describe('StreamableHTTPServerTransport', () => {
424424

425425
const text = await readSSEEvent(response);
426426
const eventLines = text.split('\n');
427-
const dataLine = eventLines.find(line => line.startsWith('data:') && line.substring(5).trim() !== '');
427+
const dataLine = eventLines.find(line => line.startsWith('data:'));
428428
expect(dataLine).toBeDefined();
429429

430430
const eventData = JSON.parse(dataLine!.substring(5));
@@ -507,7 +507,7 @@ describe('StreamableHTTPServerTransport', () => {
507507
const text = await readSSEEvent(sseResponse);
508508

509509
const eventLines = text.split('\n');
510-
const dataLine = eventLines.find(line => line.startsWith('data:') && line.substring(5).trim() !== '');
510+
const dataLine = eventLines.find(line => line.startsWith('data:'));
511511
expect(dataLine).toBeDefined();
512512

513513
const eventData = JSON.parse(dataLine!.substring(5));
@@ -1003,7 +1003,7 @@ describe('StreamableHTTPServerTransport with AuthInfo', () => {
10031003

10041004
const text = await readSSEEvent(response);
10051005
const eventLines = text.split('\n');
1006-
const dataLine = eventLines.find(line => line.startsWith('data:') && line.substring(5).trim() !== '');
1006+
const dataLine = eventLines.find(line => line.startsWith('data:'));
10071007
expect(dataLine).toBeDefined();
10081008

10091009
const eventData = JSON.parse(dataLine!.substring(5));
@@ -1039,7 +1039,7 @@ describe('StreamableHTTPServerTransport with AuthInfo', () => {
10391039

10401040
const text = await readSSEEvent(response);
10411041
const eventLines = text.split('\n');
1042-
const dataLine = eventLines.find(line => line.startsWith('data:') && line.substring(5).trim() !== '');
1042+
const dataLine = eventLines.find(line => line.startsWith('data:'));
10431043
expect(dataLine).toBeDefined();
10441044

10451045
const eventData = JSON.parse(dataLine!.substring(5));
@@ -1358,14 +1358,6 @@ describe('StreamableHTTPServerTransport with resumability', () => {
13581358
expect(sseResponse.status).toBe(200);
13591359
expect(sseResponse.headers.get('content-type')).toBe('text/event-stream');
13601360

1361-
// Read from the stream
1362-
const reader = sseResponse.body?.getReader();
1363-
1364-
// Read and validate the priming event
1365-
const { value: primingValue } = await reader!.read();
1366-
const primingText = new TextDecoder().decode(primingValue);
1367-
expect(primingText).toContain('id: ');
1368-
13691361
// Send a notification that should be stored with an event ID
13701362
const notification: JSONRPCMessage = {
13711363
jsonrpc: '2.0',
@@ -1376,7 +1368,8 @@ describe('StreamableHTTPServerTransport with resumability', () => {
13761368
// Send the notification via transport
13771369
await transport.send(notification);
13781370

1379-
// Read the notification
1371+
// Read from the stream and verify we got the notification with an event ID
1372+
const reader = sseResponse.body?.getReader();
13801373
const { value } = await reader!.read();
13811374
const text = new TextDecoder().decode(value);
13821375

@@ -1413,12 +1406,6 @@ describe('StreamableHTTPServerTransport with resumability', () => {
14131406

14141407
// Read the notification from the SSE stream
14151408
const reader = sseResponse.body?.getReader();
1416-
1417-
// Read and validate the priming event
1418-
const { value: primingValue } = await reader!.read();
1419-
const primingText = new TextDecoder().decode(primingValue);
1420-
expect(primingText).toContain('id: ');
1421-
14221409
const { value } = await reader!.read();
14231410
const text = new TextDecoder().decode(value);
14241411

@@ -1549,29 +1536,76 @@ describe('StreamableHTTPServerTransport in stateless mode', () => {
15491536
});
15501537
});
15511538

1552-
// Test SSE priming events
1553-
describe('StreamableHTTPServerTransport SSE priming events', () => {
1539+
// Test retryInterval validation
1540+
describe('StreamableHTTPServerTransport retryInterval validation', () => {
1541+
it('should throw error when retryInterval is configured without eventStore', () => {
1542+
expect(() => {
1543+
new StreamableHTTPServerTransport({
1544+
sessionIdGenerator: () => randomUUID(),
1545+
retryInterval: 5000
1546+
// No eventStore
1547+
});
1548+
}).toThrow('retryInterval requires eventStore to be configured for resumability');
1549+
});
1550+
1551+
it('should not throw when retryInterval is configured with eventStore', async () => {
1552+
const storedEvents = new Map<string, { eventId: string; message: JSONRPCMessage }>();
1553+
const eventStore: EventStore = {
1554+
async storeEvent(streamId: string, message: JSONRPCMessage): Promise<string> {
1555+
const eventId = `${streamId}_${randomUUID()}`;
1556+
storedEvents.set(eventId, { eventId, message });
1557+
return eventId;
1558+
},
1559+
async replayEventsAfter(
1560+
lastEventId: EventId,
1561+
{ send: _send }: { send: (eventId: EventId, message: JSONRPCMessage) => Promise<void> }
1562+
): Promise<StreamId> {
1563+
const streamId = lastEventId.split('_')[0];
1564+
return streamId;
1565+
}
1566+
};
1567+
1568+
expect(() => {
1569+
new StreamableHTTPServerTransport({
1570+
sessionIdGenerator: () => randomUUID(),
1571+
eventStore,
1572+
retryInterval: 5000
1573+
});
1574+
}).not.toThrow();
1575+
});
1576+
});
1577+
1578+
// Test SSE priming events for POST streams (SEP-1699)
1579+
describe('StreamableHTTPServerTransport POST SSE priming events', () => {
15541580
let server: Server;
15551581
let transport: StreamableHTTPServerTransport;
15561582
let baseUrl: URL;
15571583
let sessionId: string;
1584+
let mcpServer: McpServer;
15581585

15591586
// Simple eventStore for priming event tests
15601587
const createEventStore = (): EventStore => {
1561-
const storedEvents = new Map<string, { eventId: string; message: JSONRPCMessage }>();
1588+
const storedEvents = new Map<string, { eventId: string; message: JSONRPCMessage; streamId: string }>();
15621589
return {
15631590
async storeEvent(streamId: string, message: JSONRPCMessage): Promise<string> {
1564-
const eventId = `${streamId}_${randomUUID()}`;
1565-
storedEvents.set(eventId, { eventId, message });
1591+
const eventId = `${streamId}::${Date.now()}_${randomUUID()}`;
1592+
storedEvents.set(eventId, { eventId, message, streamId });
15661593
return eventId;
15671594
},
15681595
async replayEventsAfter(
15691596
lastEventId: EventId,
15701597
{ send }: { send: (eventId: EventId, message: JSONRPCMessage) => Promise<void> }
15711598
): Promise<StreamId> {
1572-
const streamId = lastEventId.split('_')[0];
1573-
for (const [eventId, { message }] of storedEvents.entries()) {
1574-
if (eventId.startsWith(streamId) && eventId !== lastEventId) {
1599+
const streamId = lastEventId.split('::')[0];
1600+
const eventsToReplay: Array<[string, { message: JSONRPCMessage }]> = [];
1601+
for (const [eventId, data] of storedEvents.entries()) {
1602+
if (data.streamId === streamId && eventId > lastEventId) {
1603+
eventsToReplay.push([eventId, data]);
1604+
}
1605+
}
1606+
eventsToReplay.sort(([a], [b]) => a.localeCompare(b));
1607+
for (const [eventId, { message }] of eventsToReplay) {
1608+
if (Object.keys(message).length > 0) {
15751609
await send(eventId, message);
15761610
}
15771611
}
@@ -1586,7 +1620,7 @@ describe('StreamableHTTPServerTransport SSE priming events', () => {
15861620
}
15871621
});
15881622

1589-
it('should send priming event with retry field when retryInterval is configured', async () => {
1623+
it('should send priming event with retry field on POST SSE stream', async () => {
15901624
const result = await createTestServer({
15911625
sessionIdGenerator: () => randomUUID(),
15921626
eventStore: createEventStore(),
@@ -1595,27 +1629,42 @@ describe('StreamableHTTPServerTransport SSE priming events', () => {
15951629
server = result.server;
15961630
transport = result.transport;
15971631
baseUrl = result.baseUrl;
1632+
mcpServer = result.mcpServer;
1633+
1634+
// Register a tool that we can call
1635+
mcpServer.tool('test-tool', 'A test tool', {}, async () => {
1636+
return { content: [{ type: 'text', text: 'Tool result' }] };
1637+
});
15981638

15991639
// Initialize to get session ID
16001640
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
16011641
sessionId = initResponse.headers.get('mcp-session-id') as string;
16021642
expect(sessionId).toBeDefined();
16031643

1604-
// Open a standalone SSE stream
1605-
const sseResponse = await fetch(baseUrl, {
1606-
method: 'GET',
1644+
// Send a POST request that will return SSE stream
1645+
const toolCallRequest: JSONRPCMessage = {
1646+
jsonrpc: '2.0',
1647+
id: 1,
1648+
method: 'tools/call',
1649+
params: { name: 'test-tool', arguments: {} }
1650+
};
1651+
1652+
const postResponse = await fetch(baseUrl, {
1653+
method: 'POST',
16071654
headers: {
1608-
Accept: 'text/event-stream',
1655+
'Content-Type': 'application/json',
1656+
Accept: 'text/event-stream, application/json',
16091657
'mcp-session-id': sessionId,
16101658
'mcp-protocol-version': '2025-03-26'
1611-
}
1659+
},
1660+
body: JSON.stringify(toolCallRequest)
16121661
});
16131662

1614-
expect(sseResponse.status).toBe(200);
1615-
expect(sseResponse.headers.get('content-type')).toBe('text/event-stream');
1663+
expect(postResponse.status).toBe(200);
1664+
expect(postResponse.headers.get('content-type')).toBe('text/event-stream');
16161665

16171666
// Read the priming event
1618-
const reader = sseResponse.body?.getReader();
1667+
const reader = postResponse.body?.getReader();
16191668
const { value } = await reader!.read();
16201669
const text = new TextDecoder().decode(value);
16211670

@@ -1634,91 +1683,111 @@ describe('StreamableHTTPServerTransport SSE priming events', () => {
16341683
server = result.server;
16351684
transport = result.transport;
16361685
baseUrl = result.baseUrl;
1686+
mcpServer = result.mcpServer;
1687+
1688+
mcpServer.tool('test-tool', 'A test tool', {}, async () => {
1689+
return { content: [{ type: 'text', text: 'Tool result' }] };
1690+
});
16371691

16381692
// Initialize to get session ID
16391693
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
16401694
sessionId = initResponse.headers.get('mcp-session-id') as string;
16411695
expect(sessionId).toBeDefined();
16421696

1643-
// Open a standalone SSE stream
1644-
const sseResponse = await fetch(baseUrl, {
1645-
method: 'GET',
1697+
// Send a POST request
1698+
const toolCallRequest: JSONRPCMessage = {
1699+
jsonrpc: '2.0',
1700+
id: 1,
1701+
method: 'tools/call',
1702+
params: { name: 'test-tool', arguments: {} }
1703+
};
1704+
1705+
const postResponse = await fetch(baseUrl, {
1706+
method: 'POST',
16461707
headers: {
1647-
Accept: 'text/event-stream',
1708+
'Content-Type': 'application/json',
1709+
Accept: 'text/event-stream, application/json',
16481710
'mcp-session-id': sessionId,
16491711
'mcp-protocol-version': '2025-03-26'
1650-
}
1712+
},
1713+
body: JSON.stringify(toolCallRequest)
16511714
});
16521715

1653-
expect(sseResponse.status).toBe(200);
1716+
expect(postResponse.status).toBe(200);
16541717

16551718
// Read the priming event
1656-
const reader = sseResponse.body?.getReader();
1657-
const { value: primingValue } = await reader!.read();
1658-
const primingText = new TextDecoder().decode(primingValue);
1659-
1660-
// Priming event should have id field for resumability but NOT retry field
1661-
expect(primingText).toContain('id: ');
1662-
expect(primingText).toContain('data: ');
1663-
expect(primingText).not.toContain('retry:');
1664-
1665-
// Verify priming event has ID
1666-
expect(primingText.match(/id: ([^\n]+)/)).toBeTruthy();
1667-
1668-
// Send a notification
1669-
const notification: JSONRPCMessage = {
1670-
jsonrpc: '2.0',
1671-
method: 'notifications/message',
1672-
params: { level: 'info', data: 'Test notification' }
1673-
};
1674-
await transport.send(notification);
1675-
1676-
// Read the notification
1677-
const { value: notifValue } = await reader!.read();
1678-
const notifText = new TextDecoder().decode(notifValue);
1719+
const reader = postResponse.body?.getReader();
1720+
const { value } = await reader!.read();
1721+
const text = new TextDecoder().decode(value);
16791722

1680-
// Notification should have content and no retry field
1681-
expect(notifText).toContain('Test notification');
1682-
expect(notifText).not.toContain('retry:');
1723+
// Priming event should have id field but NOT retry field
1724+
expect(text).toContain('id: ');
1725+
expect(text).toContain('data: ');
1726+
expect(text).not.toContain('retry:');
16831727
});
16841728

1685-
it('should include event ID in priming event for resumability', async () => {
1729+
it('should close POST SSE stream when closeSseStream is called', async () => {
16861730
const result = await createTestServer({
16871731
sessionIdGenerator: () => randomUUID(),
16881732
eventStore: createEventStore(),
1689-
retryInterval: 2000
1733+
retryInterval: 1000
16901734
});
16911735
server = result.server;
16921736
transport = result.transport;
16931737
baseUrl = result.baseUrl;
1738+
mcpServer = result.mcpServer;
1739+
1740+
// Register a tool that will hang until we close the stream
1741+
let toolResolve: () => void;
1742+
const toolPromise = new Promise<void>(resolve => {
1743+
toolResolve = resolve;
1744+
});
1745+
1746+
mcpServer.tool('slow-tool', 'A slow tool', {}, async () => {
1747+
await toolPromise;
1748+
return { content: [{ type: 'text', text: 'Done' }] };
1749+
});
16941750

16951751
// Initialize to get session ID
16961752
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
16971753
sessionId = initResponse.headers.get('mcp-session-id') as string;
1754+
expect(sessionId).toBeDefined();
16981755

1699-
// Open SSE stream
1700-
const sseResponse = await fetch(baseUrl, {
1701-
method: 'GET',
1756+
// Send a POST request for the slow tool
1757+
const toolCallRequest: JSONRPCMessage = {
1758+
jsonrpc: '2.0',
1759+
id: 42,
1760+
method: 'tools/call',
1761+
params: { name: 'slow-tool', arguments: {} }
1762+
};
1763+
1764+
const postResponse = await fetch(baseUrl, {
1765+
method: 'POST',
17021766
headers: {
1703-
Accept: 'text/event-stream',
1767+
'Content-Type': 'application/json',
1768+
Accept: 'text/event-stream, application/json',
17041769
'mcp-session-id': sessionId,
17051770
'mcp-protocol-version': '2025-03-26'
1706-
}
1771+
},
1772+
body: JSON.stringify(toolCallRequest)
17071773
});
17081774

1709-
// Read priming event
1710-
const reader = sseResponse.body?.getReader();
1711-
const { value } = await reader!.read();
1712-
const text = new TextDecoder().decode(value);
1775+
expect(postResponse.status).toBe(200);
17131776

1714-
// Extract event ID from priming event
1715-
const idMatch = text.match(/id: ([^\n]+)/);
1716-
expect(idMatch).toBeTruthy();
1717-
const eventId = idMatch![1];
1777+
const reader = postResponse.body?.getReader();
1778+
1779+
// Read the priming event
1780+
await reader!.read();
1781+
1782+
// Close the SSE stream for this request
1783+
transport.closeSseStream(42);
1784+
1785+
// Stream should now be closed
1786+
const { done } = await reader!.read();
1787+
expect(done).toBe(true);
17181788

1719-
// Event ID should be present and non-empty
1720-
expect(eventId).toBeTruthy();
1721-
expect(eventId.length).toBeGreaterThan(0);
1789+
// Clean up - resolve the tool promise
1790+
toolResolve!();
17221791
});
17231792
});
17241793

0 commit comments

Comments
 (0)