Skip to content

Commit 54d359e

Browse files
fix: send priming events unconditionally for resumability (SEP-1699)
Per the spec, priming events (id + empty data) should always be sent on SSE stream start for resumability. The retry field is separate and should only be added when polling behavior is wanted via retryInterval. Previously, priming events were only sent when retryInterval was set, conflating resumability with polling. Now: - Priming events are always sent on both GET and POST SSE streams - Retry field is only included when retryInterval is configured
1 parent 91ecc6f commit 54d359e

File tree

2 files changed

+57
-37
lines changed

2 files changed

+57
-37
lines changed

src/server/streamableHttp.test.ts

Lines changed: 39 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -184,8 +184,23 @@ const TEST_MESSAGES = {
184184
*/
185185
async function readSSEEvent(response: Response): Promise<string> {
186186
const reader = response.body?.getReader();
187-
const { value } = await reader!.read();
188-
return new TextDecoder().decode(value);
187+
// Keep reading until we get an event with actual data (skip priming events)
188+
while (true) {
189+
const { value, done } = await reader!.read();
190+
if (done) break;
191+
const text = new TextDecoder().decode(value);
192+
// Check all data lines (multiple events may come in one read)
193+
const dataLines = text.split('\n').filter(line => line.startsWith('data:'));
194+
// Find first data line with actual content (not just empty priming event)
195+
const dataLineWithContent = dataLines.find(line => {
196+
const content = line.substring(5).trim();
197+
return content !== '';
198+
});
199+
if (dataLineWithContent) {
200+
return text;
201+
}
202+
}
203+
return '';
189204
}
190205

191206
/**
@@ -313,7 +328,7 @@ describe('StreamableHTTPServerTransport', () => {
313328

314329
// Parse the SSE event
315330
const eventLines = text.split('\n');
316-
const dataLine = eventLines.find(line => line.startsWith('data:'));
331+
const dataLine = eventLines.find(line => line.startsWith('data:') && line.substring(5).trim() !== '');
317332
expect(dataLine).toBeDefined();
318333

319334
const eventData = JSON.parse(dataLine!.substring(5));
@@ -351,7 +366,7 @@ describe('StreamableHTTPServerTransport', () => {
351366

352367
const text = await readSSEEvent(response);
353368
const eventLines = text.split('\n');
354-
const dataLine = eventLines.find(line => line.startsWith('data:'));
369+
const dataLine = eventLines.find(line => line.startsWith('data:') && line.substring(5).trim() !== '');
355370
expect(dataLine).toBeDefined();
356371

357372
const eventData = JSON.parse(dataLine!.substring(5));
@@ -406,7 +421,7 @@ describe('StreamableHTTPServerTransport', () => {
406421

407422
const text = await readSSEEvent(response);
408423
const eventLines = text.split('\n');
409-
const dataLine = eventLines.find(line => line.startsWith('data:'));
424+
const dataLine = eventLines.find(line => line.startsWith('data:') && line.substring(5).trim() !== '');
410425
expect(dataLine).toBeDefined();
411426

412427
const eventData = JSON.parse(dataLine!.substring(5));
@@ -489,7 +504,7 @@ describe('StreamableHTTPServerTransport', () => {
489504
const text = await readSSEEvent(sseResponse);
490505

491506
const eventLines = text.split('\n');
492-
const dataLine = eventLines.find(line => line.startsWith('data:'));
507+
const dataLine = eventLines.find(line => line.startsWith('data:') && line.substring(5).trim() !== '');
493508
expect(dataLine).toBeDefined();
494509

495510
const eventData = JSON.parse(dataLine!.substring(5));
@@ -516,6 +531,9 @@ describe('StreamableHTTPServerTransport', () => {
516531
expect(sseResponse.status).toBe(200);
517532
const reader = sseResponse.body?.getReader();
518533

534+
// Skip the priming event
535+
await reader!.read();
536+
519537
// Send multiple notifications
520538
const notification1: JSONRPCMessage = {
521539
jsonrpc: '2.0',
@@ -985,7 +1003,7 @@ describe('StreamableHTTPServerTransport with AuthInfo', () => {
9851003

9861004
const text = await readSSEEvent(response);
9871005
const eventLines = text.split('\n');
988-
const dataLine = eventLines.find(line => line.startsWith('data:'));
1006+
const dataLine = eventLines.find(line => line.startsWith('data:') && line.substring(5).trim() !== '');
9891007
expect(dataLine).toBeDefined();
9901008

9911009
const eventData = JSON.parse(dataLine!.substring(5));
@@ -1021,7 +1039,7 @@ describe('StreamableHTTPServerTransport with AuthInfo', () => {
10211039

10221040
const text = await readSSEEvent(response);
10231041
const eventLines = text.split('\n');
1024-
const dataLine = eventLines.find(line => line.startsWith('data:'));
1042+
const dataLine = eventLines.find(line => line.startsWith('data:') && line.substring(5).trim() !== '');
10251043
expect(dataLine).toBeDefined();
10261044

10271045
const eventData = JSON.parse(dataLine!.substring(5));
@@ -1352,6 +1370,10 @@ describe('StreamableHTTPServerTransport with resumability', () => {
13521370

13531371
// Read from the stream and verify we got the notification with an event ID
13541372
const reader = sseResponse.body?.getReader();
1373+
1374+
// Skip the priming event
1375+
await reader!.read();
1376+
13551377
const { value } = await reader!.read();
13561378
const text = new TextDecoder().decode(value);
13571379

@@ -1388,6 +1410,10 @@ describe('StreamableHTTPServerTransport with resumability', () => {
13881410

13891411
// Read the notification from the SSE stream
13901412
const reader = sseResponse.body?.getReader();
1413+
1414+
// Skip the priming event
1415+
await reader!.read();
1416+
13911417
const { value } = await reader!.read();
13921418
const text = new TextDecoder().decode(value);
13931419

@@ -1569,7 +1595,7 @@ describe('StreamableHTTPServerTransport SSE priming events (SEP-1699)', () => {
15691595
expect(text).toContain('data: ');
15701596
});
15711597

1572-
it('should not send priming event when retryInterval is not configured', async () => {
1598+
it('should send priming event without retry field when retryInterval is not configured', async () => {
15731599
const result = await createTestServer({
15741600
sessionIdGenerator: () => randomUUID()
15751601
// No retryInterval
@@ -1595,23 +1621,15 @@ describe('StreamableHTTPServerTransport SSE priming events (SEP-1699)', () => {
15951621

15961622
expect(sseResponse.status).toBe(200);
15971623

1598-
// Send a notification to get some output
1599-
const notification: JSONRPCMessage = {
1600-
jsonrpc: '2.0',
1601-
method: 'notifications/message',
1602-
params: { level: 'info', data: 'Test' }
1603-
};
1604-
await transport.send(notification);
1605-
1606-
// Read from stream
1624+
// Read the priming event
16071625
const reader = sseResponse.body?.getReader();
16081626
const { value } = await reader!.read();
16091627
const text = new TextDecoder().decode(value);
16101628

1611-
// Should NOT have retry field (since retryInterval not configured)
1629+
// Should have id field for resumability but NOT retry field
1630+
expect(text).toContain('id: ');
1631+
expect(text).toContain('data: ');
16121632
expect(text).not.toContain('retry:');
1613-
// Should have the notification
1614-
expect(text).toContain('Test');
16151633
});
16161634

16171635
it('should include event ID in priming event for resumability', async () => {

src/server/streamableHttp.ts

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -330,16 +330,17 @@ export class StreamableHTTPServerTransport implements Transport {
330330
res.writeHead(200, headers).flushHeaders();
331331

332332
// SEP-1699: Send priming event with id and empty data to establish resumption capability
333-
// This primes the client's Last-Event-ID for reconnection and sets retry interval
334-
// Only send when retryInterval is configured (enabling polling behavior)
335-
if (this._retryInterval !== undefined) {
336-
const primingEventId = this._eventStore
337-
? await this._eventStore.storeEvent(this._standaloneSseStreamId, {} as JSONRPCMessage)
338-
: `priming-${Date.now()}`;
333+
// This primes the client's Last-Event-ID for reconnection
334+
const primingEventId = this._eventStore
335+
? await this._eventStore.storeEvent(this._standaloneSseStreamId, {} as JSONRPCMessage)
336+
: `priming-${Date.now()}`;
339337

340-
const primingEvent = `id: ${primingEventId}\nretry: ${this._retryInterval}\ndata: \n\n`;
341-
res.write(primingEvent);
338+
// Build priming event - always include id for resumability, optionally add retry for polling
339+
let primingEvent = `id: ${primingEventId}\ndata: \n\n`;
340+
if (this._retryInterval !== undefined) {
341+
primingEvent = `id: ${primingEventId}\nretry: ${this._retryInterval}\ndata: \n\n`;
342342
}
343+
res.write(primingEvent);
343344

344345
// Assign the response to the standalone SSE stream
345346
this._streamMapping.set(this._standaloneSseStreamId, res);
@@ -570,16 +571,17 @@ export class StreamableHTTPServerTransport implements Transport {
570571
res.writeHead(200, headers);
571572

572573
// SEP-1699: Send priming event with id and empty data to establish resumption capability
573-
// This primes the client's Last-Event-ID for reconnection and sets retry interval
574-
// Only send when retryInterval is configured (enabling polling behavior)
575-
if (this._retryInterval !== undefined) {
576-
const primingEventId = this._eventStore
577-
? await this._eventStore.storeEvent(streamId, {} as JSONRPCMessage)
578-
: `priming-${Date.now()}`;
574+
// This primes the client's Last-Event-ID for reconnection
575+
const primingEventId = this._eventStore
576+
? await this._eventStore.storeEvent(streamId, {} as JSONRPCMessage)
577+
: `priming-${Date.now()}`;
579578

580-
const primingEvent = `id: ${primingEventId}\nretry: ${this._retryInterval}\ndata: \n\n`;
581-
res.write(primingEvent);
579+
// Build priming event - always include id for resumability, optionally add retry for polling
580+
let primingEvent = `id: ${primingEventId}\ndata: \n\n`;
581+
if (this._retryInterval !== undefined) {
582+
primingEvent = `id: ${primingEventId}\nretry: ${this._retryInterval}\ndata: \n\n`;
582583
}
584+
res.write(primingEvent);
583585
}
584586
// Store the response for this request to send messages back through this connection
585587
// We need to track by request ID to maintain the connection

0 commit comments

Comments
 (0)