Skip to content

Commit 55b9e30

Browse files
Remove standalone GET stream polling features
Keep PR focused on POST stream resumability only: - Remove closeStandaloneSSEStream() method - Remove priming event from GET notification stream - Remove GET stream polling tests - Update resumability tests to not expect GET priming events
1 parent 8469a85 commit 55b9e30

File tree

2 files changed

+0
-254
lines changed

2 files changed

+0
-254
lines changed

src/server/streamableHttp.test.ts

Lines changed: 0 additions & 238 deletions
Original file line numberDiff line numberDiff line change
@@ -1343,12 +1343,6 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => {
13431343

13441344
const reader = sseResponse.body?.getReader();
13451345

1346-
// First read the priming event
1347-
const { value: primingValue } = await reader!.read();
1348-
const primingText = new TextDecoder().decode(primingValue);
1349-
expect(primingText).toContain('id: ');
1350-
expect(primingText).toContain('data: ');
1351-
13521346
// Send a notification that should be stored with an event ID
13531347
const notification: JSONRPCMessage = {
13541348
jsonrpc: '2.0',
@@ -1393,9 +1387,6 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => {
13931387

13941388
const reader = sseResponse.body?.getReader();
13951389

1396-
// First read the priming event
1397-
await reader!.read();
1398-
13991390
// Send a server notification through the MCP server
14001391
await mcpServer.server.sendLoggingMessage({ level: 'info', data: 'First notification from MCP server' });
14011392

@@ -1743,235 +1734,6 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => {
17431734
});
17441735
});
17451736

1746-
// Test SSE priming events for GET streams
1747-
describe('StreamableHTTPServerTransport GET SSE priming events', () => {
1748-
let server: Server;
1749-
let transport: StreamableHTTPServerTransport;
1750-
let baseUrl: URL;
1751-
let sessionId: string;
1752-
1753-
// Simple eventStore for priming event tests
1754-
const createEventStore = (): EventStore => {
1755-
const storedEvents = new Map<string, { eventId: string; message: JSONRPCMessage; streamId: string }>();
1756-
return {
1757-
async storeEvent(streamId: string, message: JSONRPCMessage): Promise<string> {
1758-
const eventId = `${streamId}::${Date.now()}_${randomUUID()}`;
1759-
storedEvents.set(eventId, { eventId, message, streamId });
1760-
return eventId;
1761-
},
1762-
async getStreamIdForEventId(eventId: string): Promise<string | undefined> {
1763-
const event = storedEvents.get(eventId);
1764-
return event?.streamId;
1765-
},
1766-
async replayEventsAfter(
1767-
lastEventId: EventId,
1768-
{ send }: { send: (eventId: EventId, message: JSONRPCMessage) => Promise<void> }
1769-
): Promise<StreamId> {
1770-
const event = storedEvents.get(lastEventId);
1771-
const streamId = event?.streamId || lastEventId.split('::')[0];
1772-
const eventsToReplay: Array<[string, { message: JSONRPCMessage }]> = [];
1773-
for (const [eventId, data] of storedEvents.entries()) {
1774-
if (data.streamId === streamId && eventId > lastEventId) {
1775-
eventsToReplay.push([eventId, data]);
1776-
}
1777-
}
1778-
eventsToReplay.sort(([a], [b]) => a.localeCompare(b));
1779-
for (const [eventId, { message }] of eventsToReplay) {
1780-
if (Object.keys(message).length > 0) {
1781-
await send(eventId, message);
1782-
}
1783-
}
1784-
return streamId;
1785-
}
1786-
};
1787-
};
1788-
1789-
afterEach(async () => {
1790-
if (server && transport) {
1791-
await stopTestServer({ server, transport });
1792-
}
1793-
});
1794-
1795-
it('should send priming event with retry field on GET SSE stream', async () => {
1796-
const result = await createTestServer({
1797-
sessionIdGenerator: () => randomUUID(),
1798-
eventStore: createEventStore(),
1799-
retryInterval: 5000
1800-
});
1801-
server = result.server;
1802-
transport = result.transport;
1803-
baseUrl = result.baseUrl;
1804-
1805-
// Initialize to get session ID
1806-
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
1807-
sessionId = initResponse.headers.get('mcp-session-id') as string;
1808-
expect(sessionId).toBeDefined();
1809-
1810-
// Open a GET SSE stream
1811-
const getResponse = await fetch(baseUrl, {
1812-
method: 'GET',
1813-
headers: {
1814-
Accept: 'text/event-stream',
1815-
'mcp-session-id': sessionId,
1816-
'mcp-protocol-version': '2025-03-26'
1817-
}
1818-
});
1819-
1820-
expect(getResponse.status).toBe(200);
1821-
expect(getResponse.headers.get('content-type')).toBe('text/event-stream');
1822-
1823-
// Read the priming event
1824-
const reader = getResponse.body?.getReader();
1825-
const { value } = await reader!.read();
1826-
const text = new TextDecoder().decode(value);
1827-
1828-
// Verify priming event has id and retry field
1829-
expect(text).toContain('id: ');
1830-
expect(text).toContain('retry: 5000');
1831-
expect(text).toContain('data: ');
1832-
});
1833-
1834-
it('should send priming event without retry field when retryInterval is not configured', async () => {
1835-
const result = await createTestServer({
1836-
sessionIdGenerator: () => randomUUID(),
1837-
eventStore: createEventStore()
1838-
// No retryInterval
1839-
});
1840-
server = result.server;
1841-
transport = result.transport;
1842-
baseUrl = result.baseUrl;
1843-
1844-
// Initialize to get session ID
1845-
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
1846-
sessionId = initResponse.headers.get('mcp-session-id') as string;
1847-
expect(sessionId).toBeDefined();
1848-
1849-
// Open a GET SSE stream
1850-
const getResponse = await fetch(baseUrl, {
1851-
method: 'GET',
1852-
headers: {
1853-
Accept: 'text/event-stream',
1854-
'mcp-session-id': sessionId,
1855-
'mcp-protocol-version': '2025-03-26'
1856-
}
1857-
});
1858-
1859-
expect(getResponse.status).toBe(200);
1860-
1861-
// Read the priming event
1862-
const reader = getResponse.body?.getReader();
1863-
const { value } = await reader!.read();
1864-
const text = new TextDecoder().decode(value);
1865-
1866-
// Priming event should have id field but NOT retry field
1867-
expect(text).toContain('id: ');
1868-
expect(text).toContain('data: ');
1869-
expect(text).not.toContain('retry:');
1870-
});
1871-
1872-
it('should close GET SSE stream when closeStandaloneSSEStream is called', async () => {
1873-
const result = await createTestServer({
1874-
sessionIdGenerator: () => randomUUID(),
1875-
eventStore: createEventStore(),
1876-
retryInterval: 1000
1877-
});
1878-
server = result.server;
1879-
transport = result.transport;
1880-
baseUrl = result.baseUrl;
1881-
1882-
// Initialize to get session ID
1883-
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
1884-
sessionId = initResponse.headers.get('mcp-session-id') as string;
1885-
expect(sessionId).toBeDefined();
1886-
1887-
// Open a GET SSE stream
1888-
const getResponse = await fetch(baseUrl, {
1889-
method: 'GET',
1890-
headers: {
1891-
Accept: 'text/event-stream',
1892-
'mcp-session-id': sessionId,
1893-
'mcp-protocol-version': '2025-03-26'
1894-
}
1895-
});
1896-
1897-
expect(getResponse.status).toBe(200);
1898-
1899-
const reader = getResponse.body?.getReader();
1900-
1901-
// Read the priming event
1902-
await reader!.read();
1903-
1904-
// Close the standalone SSE stream
1905-
transport.closeStandaloneSSEStream();
1906-
1907-
// Stream should now be closed
1908-
const { done } = await reader!.read();
1909-
expect(done).toBe(true);
1910-
});
1911-
1912-
it('should allow GET SSE stream reconnection with Last-Event-ID after closeStandaloneSSEStream', async () => {
1913-
const result = await createTestServer({
1914-
sessionIdGenerator: () => randomUUID(),
1915-
eventStore: createEventStore(),
1916-
retryInterval: 1000
1917-
});
1918-
server = result.server;
1919-
transport = result.transport;
1920-
baseUrl = result.baseUrl;
1921-
1922-
// Initialize to get session ID
1923-
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
1924-
sessionId = initResponse.headers.get('mcp-session-id') as string;
1925-
expect(sessionId).toBeDefined();
1926-
1927-
// Open a GET SSE stream
1928-
const getResponse = await fetch(baseUrl, {
1929-
method: 'GET',
1930-
headers: {
1931-
Accept: 'text/event-stream',
1932-
'mcp-session-id': sessionId,
1933-
'mcp-protocol-version': '2025-03-26'
1934-
}
1935-
});
1936-
1937-
expect(getResponse.status).toBe(200);
1938-
1939-
const reader = getResponse.body?.getReader();
1940-
1941-
// Read the priming event and extract event ID
1942-
const { value: primingValue } = await reader!.read();
1943-
const primingText = new TextDecoder().decode(primingValue);
1944-
expect(primingText).toContain('id: ');
1945-
expect(primingText).toContain('retry: 1000');
1946-
1947-
// Extract the priming event ID
1948-
const primingIdMatch = primingText.match(/id: ([^\n]+)/);
1949-
expect(primingIdMatch).toBeTruthy();
1950-
const primingEventId = primingIdMatch![1];
1951-
1952-
// Close the standalone SSE stream
1953-
transport.closeStandaloneSSEStream();
1954-
1955-
// Verify stream is closed
1956-
const { done } = await reader!.read();
1957-
expect(done).toBe(true);
1958-
1959-
// Client reconnects with Last-Event-ID
1960-
const reconnectResponse = await fetch(baseUrl, {
1961-
method: 'GET',
1962-
headers: {
1963-
Accept: 'text/event-stream',
1964-
'mcp-session-id': sessionId,
1965-
'mcp-protocol-version': '2025-03-26',
1966-
'last-event-id': primingEventId
1967-
}
1968-
});
1969-
1970-
expect(reconnectResponse.status).toBe(200);
1971-
expect(reconnectResponse.headers.get('content-type')).toBe('text/event-stream');
1972-
});
1973-
});
1974-
19751737
// Test onsessionclosed callback
19761738
describe('StreamableHTTPServerTransport onsessionclosed callback', () => {
19771739
it('should call onsessionclosed callback when session is closed via DELETE', async () => {

src/server/streamableHttp.ts

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -358,9 +358,6 @@ export class StreamableHTTPServerTransport implements Transport {
358358
// otherwise the client will just wait for the first message
359359
res.writeHead(200, headers).flushHeaders();
360360

361-
// Send priming event for resumability/polling support
362-
await this._maybeWritePrimingEvent(res, this._standaloneSseStreamId);
363-
364361
// Assign the response to the standalone SSE stream
365362
this._streamMapping.set(this._standaloneSseStreamId, res);
366363
// Set up close handler for client disconnects
@@ -814,19 +811,6 @@ export class StreamableHTTPServerTransport implements Transport {
814811
}
815812
}
816813

817-
/**
818-
* Close the standalone SSE notification stream, triggering client reconnection.
819-
* Use this to implement polling behavior on the GET stream -
820-
* client will reconnect after the retry interval specified in the priming event.
821-
*/
822-
closeStandaloneSSEStream(): void {
823-
const stream = this._streamMapping.get(this._standaloneSseStreamId);
824-
if (stream) {
825-
stream.end();
826-
this._streamMapping.delete(this._standaloneSseStreamId);
827-
}
828-
}
829-
830814
async send(message: JSONRPCMessage, options?: { relatedRequestId?: RequestId }): Promise<void> {
831815
let requestId = options?.relatedRequestId;
832816
if (isJSONRPCResponse(message) || isJSONRPCError(message)) {

0 commit comments

Comments
 (0)