Skip to content

Commit 8f6701e

Browse files
feat: add GET stream polling support for SEP-1699
- Add closeStandaloneSSEStream() method to allow server to close the standalone GET notification stream, triggering client reconnection - Send priming event with retry field on GET streams (when eventStore configured) for resumability - Add tests for GET stream priming events and closeStandaloneSSEStream - Fix flaky test timeout for POST SSE polling test
1 parent 6d32c15 commit 8f6701e

File tree

2 files changed

+259
-3
lines changed

2 files changed

+259
-3
lines changed

src/server/streamableHttp.test.ts

Lines changed: 243 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1345,6 +1345,14 @@ describe('StreamableHTTPServerTransport with resumability', () => {
13451345
expect(sseResponse.status).toBe(200);
13461346
expect(sseResponse.headers.get('content-type')).toBe('text/event-stream');
13471347

1348+
const reader = sseResponse.body?.getReader();
1349+
1350+
// First read the priming event (SEP-1699)
1351+
const { value: primingValue } = await reader!.read();
1352+
const primingText = new TextDecoder().decode(primingValue);
1353+
expect(primingText).toContain('id: ');
1354+
expect(primingText).toContain('data: ');
1355+
13481356
// Send a notification that should be stored with an event ID
13491357
const notification: JSONRPCMessage = {
13501358
jsonrpc: '2.0',
@@ -1356,7 +1364,6 @@ describe('StreamableHTTPServerTransport with resumability', () => {
13561364
await transport.send(notification);
13571365

13581366
// Read from the stream and verify we got the notification with an event ID
1359-
const reader = sseResponse.body?.getReader();
13601367
const { value } = await reader!.read();
13611368
const text = new TextDecoder().decode(value);
13621369

@@ -1388,11 +1395,15 @@ describe('StreamableHTTPServerTransport with resumability', () => {
13881395
});
13891396
expect(sseResponse.status).toBe(200);
13901397

1398+
const reader = sseResponse.body?.getReader();
1399+
1400+
// First read the priming event (SEP-1699)
1401+
await reader!.read();
1402+
13911403
// Send a server notification through the MCP server
13921404
await mcpServer.server.sendLoggingMessage({ level: 'info', data: 'First notification from MCP server' });
13931405

13941406
// Read the notification from the SSE stream
1395-
const reader = sseResponse.body?.getReader();
13961407
const { value } = await reader!.read();
13971408
const text = new TextDecoder().decode(value);
13981409

@@ -1745,7 +1756,7 @@ describe('StreamableHTTPServerTransport POST SSE priming events', () => {
17451756
toolResolve!();
17461757
});
17471758

1748-
it('should support POST SSE polling with client reconnection', async () => {
1759+
it('should support POST SSE polling with client reconnection', { timeout: 10000 }, async () => {
17491760
const result = await createTestServer({
17501761
sessionIdGenerator: () => randomUUID(),
17511762
eventStore: createEventStore(),
@@ -2132,6 +2143,235 @@ describe('StreamableHTTPServerTransport POST SSE priming events', () => {
21322143
});
21332144
});
21342145

2146+
// Test SSE priming events for GET streams (SEP-1699)
2147+
describe('StreamableHTTPServerTransport GET SSE priming events', () => {
2148+
let server: Server;
2149+
let transport: StreamableHTTPServerTransport;
2150+
let baseUrl: URL;
2151+
let sessionId: string;
2152+
2153+
// Simple eventStore for priming event tests
2154+
const createEventStore = (): EventStore => {
2155+
const storedEvents = new Map<string, { eventId: string; message: JSONRPCMessage; streamId: string }>();
2156+
return {
2157+
async storeEvent(streamId: string, message: JSONRPCMessage): Promise<string> {
2158+
const eventId = `${streamId}::${Date.now()}_${randomUUID()}`;
2159+
storedEvents.set(eventId, { eventId, message, streamId });
2160+
return eventId;
2161+
},
2162+
async getStreamIdForEventId(eventId: string): Promise<string | undefined> {
2163+
const event = storedEvents.get(eventId);
2164+
return event?.streamId;
2165+
},
2166+
async replayEventsAfter(
2167+
lastEventId: EventId,
2168+
{ send }: { send: (eventId: EventId, message: JSONRPCMessage) => Promise<void> }
2169+
): Promise<StreamId> {
2170+
const event = storedEvents.get(lastEventId);
2171+
const streamId = event?.streamId || lastEventId.split('::')[0];
2172+
const eventsToReplay: Array<[string, { message: JSONRPCMessage }]> = [];
2173+
for (const [eventId, data] of storedEvents.entries()) {
2174+
if (data.streamId === streamId && eventId > lastEventId) {
2175+
eventsToReplay.push([eventId, data]);
2176+
}
2177+
}
2178+
eventsToReplay.sort(([a], [b]) => a.localeCompare(b));
2179+
for (const [eventId, { message }] of eventsToReplay) {
2180+
if (Object.keys(message).length > 0) {
2181+
await send(eventId, message);
2182+
}
2183+
}
2184+
return streamId;
2185+
}
2186+
};
2187+
};
2188+
2189+
afterEach(async () => {
2190+
if (server && transport) {
2191+
await stopTestServer({ server, transport });
2192+
}
2193+
});
2194+
2195+
it('should send priming event with retry field on GET SSE stream', async () => {
2196+
const result = await createTestServer({
2197+
sessionIdGenerator: () => randomUUID(),
2198+
eventStore: createEventStore(),
2199+
retryInterval: 5000
2200+
});
2201+
server = result.server;
2202+
transport = result.transport;
2203+
baseUrl = result.baseUrl;
2204+
2205+
// Initialize to get session ID
2206+
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
2207+
sessionId = initResponse.headers.get('mcp-session-id') as string;
2208+
expect(sessionId).toBeDefined();
2209+
2210+
// Open a GET SSE stream
2211+
const getResponse = await fetch(baseUrl, {
2212+
method: 'GET',
2213+
headers: {
2214+
Accept: 'text/event-stream',
2215+
'mcp-session-id': sessionId,
2216+
'mcp-protocol-version': '2025-03-26'
2217+
}
2218+
});
2219+
2220+
expect(getResponse.status).toBe(200);
2221+
expect(getResponse.headers.get('content-type')).toBe('text/event-stream');
2222+
2223+
// Read the priming event
2224+
const reader = getResponse.body?.getReader();
2225+
const { value } = await reader!.read();
2226+
const text = new TextDecoder().decode(value);
2227+
2228+
// Verify priming event has id and retry field
2229+
expect(text).toContain('id: ');
2230+
expect(text).toContain('retry: 5000');
2231+
expect(text).toContain('data: ');
2232+
});
2233+
2234+
it('should send priming event without retry field when retryInterval is not configured', async () => {
2235+
const result = await createTestServer({
2236+
sessionIdGenerator: () => randomUUID(),
2237+
eventStore: createEventStore()
2238+
// No retryInterval
2239+
});
2240+
server = result.server;
2241+
transport = result.transport;
2242+
baseUrl = result.baseUrl;
2243+
2244+
// Initialize to get session ID
2245+
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
2246+
sessionId = initResponse.headers.get('mcp-session-id') as string;
2247+
expect(sessionId).toBeDefined();
2248+
2249+
// Open a GET SSE stream
2250+
const getResponse = await fetch(baseUrl, {
2251+
method: 'GET',
2252+
headers: {
2253+
Accept: 'text/event-stream',
2254+
'mcp-session-id': sessionId,
2255+
'mcp-protocol-version': '2025-03-26'
2256+
}
2257+
});
2258+
2259+
expect(getResponse.status).toBe(200);
2260+
2261+
// Read the priming event
2262+
const reader = getResponse.body?.getReader();
2263+
const { value } = await reader!.read();
2264+
const text = new TextDecoder().decode(value);
2265+
2266+
// Priming event should have id field but NOT retry field
2267+
expect(text).toContain('id: ');
2268+
expect(text).toContain('data: ');
2269+
expect(text).not.toContain('retry:');
2270+
});
2271+
2272+
it('should close GET SSE stream when closeStandaloneSSEStream is called', async () => {
2273+
const result = await createTestServer({
2274+
sessionIdGenerator: () => randomUUID(),
2275+
eventStore: createEventStore(),
2276+
retryInterval: 1000
2277+
});
2278+
server = result.server;
2279+
transport = result.transport;
2280+
baseUrl = result.baseUrl;
2281+
2282+
// Initialize to get session ID
2283+
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
2284+
sessionId = initResponse.headers.get('mcp-session-id') as string;
2285+
expect(sessionId).toBeDefined();
2286+
2287+
// Open a GET SSE stream
2288+
const getResponse = await fetch(baseUrl, {
2289+
method: 'GET',
2290+
headers: {
2291+
Accept: 'text/event-stream',
2292+
'mcp-session-id': sessionId,
2293+
'mcp-protocol-version': '2025-03-26'
2294+
}
2295+
});
2296+
2297+
expect(getResponse.status).toBe(200);
2298+
2299+
const reader = getResponse.body?.getReader();
2300+
2301+
// Read the priming event
2302+
await reader!.read();
2303+
2304+
// Close the standalone SSE stream
2305+
transport.closeStandaloneSSEStream();
2306+
2307+
// Stream should now be closed
2308+
const { done } = await reader!.read();
2309+
expect(done).toBe(true);
2310+
});
2311+
2312+
it('should allow GET SSE stream reconnection with Last-Event-ID after closeStandaloneSSEStream', async () => {
2313+
const result = await createTestServer({
2314+
sessionIdGenerator: () => randomUUID(),
2315+
eventStore: createEventStore(),
2316+
retryInterval: 1000
2317+
});
2318+
server = result.server;
2319+
transport = result.transport;
2320+
baseUrl = result.baseUrl;
2321+
2322+
// Initialize to get session ID
2323+
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
2324+
sessionId = initResponse.headers.get('mcp-session-id') as string;
2325+
expect(sessionId).toBeDefined();
2326+
2327+
// Open a GET SSE stream
2328+
const getResponse = await fetch(baseUrl, {
2329+
method: 'GET',
2330+
headers: {
2331+
Accept: 'text/event-stream',
2332+
'mcp-session-id': sessionId,
2333+
'mcp-protocol-version': '2025-03-26'
2334+
}
2335+
});
2336+
2337+
expect(getResponse.status).toBe(200);
2338+
2339+
const reader = getResponse.body?.getReader();
2340+
2341+
// Read the priming event and extract event ID
2342+
const { value: primingValue } = await reader!.read();
2343+
const primingText = new TextDecoder().decode(primingValue);
2344+
expect(primingText).toContain('id: ');
2345+
expect(primingText).toContain('retry: 1000');
2346+
2347+
// Extract the priming event ID
2348+
const primingIdMatch = primingText.match(/id: ([^\n]+)/);
2349+
expect(primingIdMatch).toBeTruthy();
2350+
const primingEventId = primingIdMatch![1];
2351+
2352+
// Close the standalone SSE stream
2353+
transport.closeStandaloneSSEStream();
2354+
2355+
// Verify stream is closed
2356+
const { done } = await reader!.read();
2357+
expect(done).toBe(true);
2358+
2359+
// Client reconnects with Last-Event-ID
2360+
const reconnectResponse = await fetch(baseUrl, {
2361+
method: 'GET',
2362+
headers: {
2363+
Accept: 'text/event-stream',
2364+
'mcp-session-id': sessionId,
2365+
'mcp-protocol-version': '2025-03-26',
2366+
'last-event-id': primingEventId
2367+
}
2368+
});
2369+
2370+
expect(reconnectResponse.status).toBe(200);
2371+
expect(reconnectResponse.headers.get('content-type')).toBe('text/event-stream');
2372+
});
2373+
});
2374+
21352375
// Test onsessionclosed callback
21362376
describe('StreamableHTTPServerTransport onsessionclosed callback', () => {
21372377
it('should call onsessionclosed callback when session is closed via DELETE', async () => {

src/server/streamableHttp.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,9 @@ export class StreamableHTTPServerTransport implements Transport {
358358
// otherwise the client will just wait for the first message
359359
res.writeHead(200, headers).flushHeaders();
360360

361+
// Send priming event for resumability/polling support
362+
await this._maybeWritePrimingEvent(res, this._standaloneSseStreamId);
363+
361364
// Assign the response to the standalone SSE stream
362365
this._streamMapping.set(this._standaloneSseStreamId, res);
363366
// Set up close handler for client disconnects
@@ -811,6 +814,19 @@ export class StreamableHTTPServerTransport implements Transport {
811814
}
812815
}
813816

817+
/**
818+
* Close the standalone SSE notification stream, triggering client reconnection.
819+
* Use this to implement polling behavior on the GET stream -
820+
* client will reconnect after the retry interval specified in the priming event.
821+
*/
822+
closeStandaloneSSEStream(): void {
823+
const stream = this._streamMapping.get(this._standaloneSseStreamId);
824+
if (stream) {
825+
stream.end();
826+
this._streamMapping.delete(this._standaloneSseStreamId);
827+
}
828+
}
829+
814830
async send(message: JSONRPCMessage, options?: { relatedRequestId?: RequestId }): Promise<void> {
815831
let requestId = options?.relatedRequestId;
816832
if (isJSONRPCResponse(message) || isJSONRPCError(message)) {

0 commit comments

Comments
 (0)