Skip to content

Commit 881e874

Browse files
feat: add closeSseStream API for POST SSE polling (SEP-1699)
- Remove priming from GET streams (only POST streams per spec) - Replace closeStandaloneSseStream() with closeSseStream(requestId) - Add validation that retryInterval requires eventStore - Add tests for POST SSE priming events and stream closure - Restore original resumability tests without priming expectations
1 parent b4b7aa1 commit 881e874

File tree

2 files changed

+176
-86
lines changed

2 files changed

+176
-86
lines changed

src/server/streamableHttp.test.ts

Lines changed: 149 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -1361,11 +1361,6 @@ describe('StreamableHTTPServerTransport with resumability', () => {
13611361
// Read from the stream
13621362
const reader = sseResponse.body?.getReader();
13631363

1364-
// Read and validate the priming event
1365-
const { value: primingValue } = await reader!.read();
1366-
const primingText = new TextDecoder().decode(primingValue);
1367-
expect(primingText).toContain('id: ');
1368-
13691364
// Send a notification that should be stored with an event ID
13701365
const notification: JSONRPCMessage = {
13711366
jsonrpc: '2.0',
@@ -1413,12 +1408,6 @@ describe('StreamableHTTPServerTransport with resumability', () => {
14131408

14141409
// Read the notification from the SSE stream
14151410
const reader = sseResponse.body?.getReader();
1416-
1417-
// Read and validate the priming event
1418-
const { value: primingValue } = await reader!.read();
1419-
const primingText = new TextDecoder().decode(primingValue);
1420-
expect(primingText).toContain('id: ');
1421-
14221411
const { value } = await reader!.read();
14231412
const text = new TextDecoder().decode(value);
14241413

@@ -1549,29 +1538,76 @@ describe('StreamableHTTPServerTransport in stateless mode', () => {
15491538
});
15501539
});
15511540

1552-
// Test SSE priming events
1553-
describe('StreamableHTTPServerTransport SSE priming events', () => {
1541+
// Test retryInterval validation
1542+
describe('StreamableHTTPServerTransport retryInterval validation', () => {
1543+
it('should throw error when retryInterval is configured without eventStore', () => {
1544+
expect(() => {
1545+
new StreamableHTTPServerTransport({
1546+
sessionIdGenerator: () => randomUUID(),
1547+
retryInterval: 5000
1548+
// No eventStore
1549+
});
1550+
}).toThrow('retryInterval requires eventStore to be configured for resumability');
1551+
});
1552+
1553+
it('should not throw when retryInterval is configured with eventStore', async () => {
1554+
const storedEvents = new Map<string, { eventId: string; message: JSONRPCMessage }>();
1555+
const eventStore: EventStore = {
1556+
async storeEvent(streamId: string, message: JSONRPCMessage): Promise<string> {
1557+
const eventId = `${streamId}_${randomUUID()}`;
1558+
storedEvents.set(eventId, { eventId, message });
1559+
return eventId;
1560+
},
1561+
async replayEventsAfter(
1562+
lastEventId: EventId,
1563+
{ send: _send }: { send: (eventId: EventId, message: JSONRPCMessage) => Promise<void> }
1564+
): Promise<StreamId> {
1565+
const streamId = lastEventId.split('_')[0];
1566+
return streamId;
1567+
}
1568+
};
1569+
1570+
expect(() => {
1571+
new StreamableHTTPServerTransport({
1572+
sessionIdGenerator: () => randomUUID(),
1573+
eventStore,
1574+
retryInterval: 5000
1575+
});
1576+
}).not.toThrow();
1577+
});
1578+
});
1579+
1580+
// Test SSE priming events for POST streams (SEP-1699)
1581+
describe('StreamableHTTPServerTransport POST SSE priming events', () => {
15541582
let server: Server;
15551583
let transport: StreamableHTTPServerTransport;
15561584
let baseUrl: URL;
15571585
let sessionId: string;
1586+
let mcpServer: McpServer;
15581587

15591588
// Simple eventStore for priming event tests
15601589
const createEventStore = (): EventStore => {
1561-
const storedEvents = new Map<string, { eventId: string; message: JSONRPCMessage }>();
1590+
const storedEvents = new Map<string, { eventId: string; message: JSONRPCMessage; streamId: string }>();
15621591
return {
15631592
async storeEvent(streamId: string, message: JSONRPCMessage): Promise<string> {
1564-
const eventId = `${streamId}_${randomUUID()}`;
1565-
storedEvents.set(eventId, { eventId, message });
1593+
const eventId = `${streamId}::${Date.now()}_${randomUUID()}`;
1594+
storedEvents.set(eventId, { eventId, message, streamId });
15661595
return eventId;
15671596
},
15681597
async replayEventsAfter(
15691598
lastEventId: EventId,
15701599
{ send }: { send: (eventId: EventId, message: JSONRPCMessage) => Promise<void> }
15711600
): Promise<StreamId> {
1572-
const streamId = lastEventId.split('_')[0];
1573-
for (const [eventId, { message }] of storedEvents.entries()) {
1574-
if (eventId.startsWith(streamId) && eventId !== lastEventId) {
1601+
const streamId = lastEventId.split('::')[0];
1602+
const eventsToReplay: Array<[string, { message: JSONRPCMessage }]> = [];
1603+
for (const [eventId, data] of storedEvents.entries()) {
1604+
if (data.streamId === streamId && eventId > lastEventId) {
1605+
eventsToReplay.push([eventId, data]);
1606+
}
1607+
}
1608+
eventsToReplay.sort(([a], [b]) => a.localeCompare(b));
1609+
for (const [eventId, { message }] of eventsToReplay) {
1610+
if (Object.keys(message).length > 0) {
15751611
await send(eventId, message);
15761612
}
15771613
}
@@ -1586,7 +1622,7 @@ describe('StreamableHTTPServerTransport SSE priming events', () => {
15861622
}
15871623
});
15881624

1589-
it('should send priming event with retry field when retryInterval is configured', async () => {
1625+
it('should send priming event with retry field on POST SSE stream', async () => {
15901626
const result = await createTestServer({
15911627
sessionIdGenerator: () => randomUUID(),
15921628
eventStore: createEventStore(),
@@ -1595,27 +1631,42 @@ describe('StreamableHTTPServerTransport SSE priming events', () => {
15951631
server = result.server;
15961632
transport = result.transport;
15971633
baseUrl = result.baseUrl;
1634+
mcpServer = result.mcpServer;
1635+
1636+
// Register a tool that we can call
1637+
mcpServer.tool('test-tool', 'A test tool', {}, async () => {
1638+
return { content: [{ type: 'text', text: 'Tool result' }] };
1639+
});
15981640

15991641
// Initialize to get session ID
16001642
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
16011643
sessionId = initResponse.headers.get('mcp-session-id') as string;
16021644
expect(sessionId).toBeDefined();
16031645

1604-
// Open a standalone SSE stream
1605-
const sseResponse = await fetch(baseUrl, {
1606-
method: 'GET',
1646+
// Send a POST request that will return SSE stream
1647+
const toolCallRequest: JSONRPCMessage = {
1648+
jsonrpc: '2.0',
1649+
id: 1,
1650+
method: 'tools/call',
1651+
params: { name: 'test-tool', arguments: {} }
1652+
};
1653+
1654+
const postResponse = await fetch(baseUrl, {
1655+
method: 'POST',
16071656
headers: {
1608-
Accept: 'text/event-stream',
1657+
'Content-Type': 'application/json',
1658+
Accept: 'text/event-stream, application/json',
16091659
'mcp-session-id': sessionId,
16101660
'mcp-protocol-version': '2025-03-26'
1611-
}
1661+
},
1662+
body: JSON.stringify(toolCallRequest)
16121663
});
16131664

1614-
expect(sseResponse.status).toBe(200);
1615-
expect(sseResponse.headers.get('content-type')).toBe('text/event-stream');
1665+
expect(postResponse.status).toBe(200);
1666+
expect(postResponse.headers.get('content-type')).toBe('text/event-stream');
16161667

16171668
// Read the priming event
1618-
const reader = sseResponse.body?.getReader();
1669+
const reader = postResponse.body?.getReader();
16191670
const { value } = await reader!.read();
16201671
const text = new TextDecoder().decode(value);
16211672

@@ -1634,91 +1685,111 @@ describe('StreamableHTTPServerTransport SSE priming events', () => {
16341685
server = result.server;
16351686
transport = result.transport;
16361687
baseUrl = result.baseUrl;
1688+
mcpServer = result.mcpServer;
1689+
1690+
mcpServer.tool('test-tool', 'A test tool', {}, async () => {
1691+
return { content: [{ type: 'text', text: 'Tool result' }] };
1692+
});
16371693

16381694
// Initialize to get session ID
16391695
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
16401696
sessionId = initResponse.headers.get('mcp-session-id') as string;
16411697
expect(sessionId).toBeDefined();
16421698

1643-
// Open a standalone SSE stream
1644-
const sseResponse = await fetch(baseUrl, {
1645-
method: 'GET',
1699+
// Send a POST request
1700+
const toolCallRequest: JSONRPCMessage = {
1701+
jsonrpc: '2.0',
1702+
id: 1,
1703+
method: 'tools/call',
1704+
params: { name: 'test-tool', arguments: {} }
1705+
};
1706+
1707+
const postResponse = await fetch(baseUrl, {
1708+
method: 'POST',
16461709
headers: {
1647-
Accept: 'text/event-stream',
1710+
'Content-Type': 'application/json',
1711+
Accept: 'text/event-stream, application/json',
16481712
'mcp-session-id': sessionId,
16491713
'mcp-protocol-version': '2025-03-26'
1650-
}
1714+
},
1715+
body: JSON.stringify(toolCallRequest)
16511716
});
16521717

1653-
expect(sseResponse.status).toBe(200);
1718+
expect(postResponse.status).toBe(200);
16541719

16551720
// Read the priming event
1656-
const reader = sseResponse.body?.getReader();
1657-
const { value: primingValue } = await reader!.read();
1658-
const primingText = new TextDecoder().decode(primingValue);
1659-
1660-
// Priming event should have id field for resumability but NOT retry field
1661-
expect(primingText).toContain('id: ');
1662-
expect(primingText).toContain('data: ');
1663-
expect(primingText).not.toContain('retry:');
1664-
1665-
// Verify priming event has ID
1666-
expect(primingText.match(/id: ([^\n]+)/)).toBeTruthy();
1667-
1668-
// Send a notification
1669-
const notification: JSONRPCMessage = {
1670-
jsonrpc: '2.0',
1671-
method: 'notifications/message',
1672-
params: { level: 'info', data: 'Test notification' }
1673-
};
1674-
await transport.send(notification);
1675-
1676-
// Read the notification
1677-
const { value: notifValue } = await reader!.read();
1678-
const notifText = new TextDecoder().decode(notifValue);
1721+
const reader = postResponse.body?.getReader();
1722+
const { value } = await reader!.read();
1723+
const text = new TextDecoder().decode(value);
16791724

1680-
// Notification should have content and no retry field
1681-
expect(notifText).toContain('Test notification');
1682-
expect(notifText).not.toContain('retry:');
1725+
// Priming event should have id field but NOT retry field
1726+
expect(text).toContain('id: ');
1727+
expect(text).toContain('data: ');
1728+
expect(text).not.toContain('retry:');
16831729
});
16841730

1685-
it('should include event ID in priming event for resumability', async () => {
1731+
it('should close POST SSE stream when closeSseStream is called', async () => {
16861732
const result = await createTestServer({
16871733
sessionIdGenerator: () => randomUUID(),
16881734
eventStore: createEventStore(),
1689-
retryInterval: 2000
1735+
retryInterval: 1000
16901736
});
16911737
server = result.server;
16921738
transport = result.transport;
16931739
baseUrl = result.baseUrl;
1740+
mcpServer = result.mcpServer;
1741+
1742+
// Register a tool that will hang until we close the stream
1743+
let toolResolve: () => void;
1744+
const toolPromise = new Promise<void>(resolve => {
1745+
toolResolve = resolve;
1746+
});
1747+
1748+
mcpServer.tool('slow-tool', 'A slow tool', {}, async () => {
1749+
await toolPromise;
1750+
return { content: [{ type: 'text', text: 'Done' }] };
1751+
});
16941752

16951753
// Initialize to get session ID
16961754
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
16971755
sessionId = initResponse.headers.get('mcp-session-id') as string;
1756+
expect(sessionId).toBeDefined();
16981757

1699-
// Open SSE stream
1700-
const sseResponse = await fetch(baseUrl, {
1701-
method: 'GET',
1758+
// Send a POST request for the slow tool
1759+
const toolCallRequest: JSONRPCMessage = {
1760+
jsonrpc: '2.0',
1761+
id: 42,
1762+
method: 'tools/call',
1763+
params: { name: 'slow-tool', arguments: {} }
1764+
};
1765+
1766+
const postResponse = await fetch(baseUrl, {
1767+
method: 'POST',
17021768
headers: {
1703-
Accept: 'text/event-stream',
1769+
'Content-Type': 'application/json',
1770+
Accept: 'text/event-stream, application/json',
17041771
'mcp-session-id': sessionId,
17051772
'mcp-protocol-version': '2025-03-26'
1706-
}
1773+
},
1774+
body: JSON.stringify(toolCallRequest)
17071775
});
17081776

1709-
// Read priming event
1710-
const reader = sseResponse.body?.getReader();
1711-
const { value } = await reader!.read();
1712-
const text = new TextDecoder().decode(value);
1777+
expect(postResponse.status).toBe(200);
17131778

1714-
// Extract event ID from priming event
1715-
const idMatch = text.match(/id: ([^\n]+)/);
1716-
expect(idMatch).toBeTruthy();
1717-
const eventId = idMatch![1];
1779+
const reader = postResponse.body?.getReader();
1780+
1781+
// Read the priming event
1782+
await reader!.read();
1783+
1784+
// Close the SSE stream for this request
1785+
transport.closeSseStream(42);
1786+
1787+
// Stream should now be closed
1788+
const { done } = await reader!.read();
1789+
expect(done).toBe(true);
17181790

1719-
// Event ID should be present and non-empty
1720-
expect(eventId).toBeTruthy();
1721-
expect(eventId.length).toBeGreaterThan(0);
1791+
// Clean up - resolve the tool promise
1792+
toolResolve!();
17221793
});
17231794
});
17241795

0 commit comments

Comments
 (0)