Skip to content

Commit eb2ccc0

Browse files
fix: address CodeRabbit review feedback
1. Add null/object guard before enqueuing UIMessageChunk from SSE stream to handle heartbeat or malformed events safely 2. Use incrementing counter instead of Date.now() in test message factories to avoid duplicate IDs 3. Add test covering publicAccessToken from trigger response being used for stream subscription auth Co-authored-by: Eric Allam <eric@trigger.dev>
1 parent e4c30b0 commit eb2ccc0

File tree

2 files changed

+85
-4
lines changed

2 files changed

+85
-4
lines changed

packages/trigger-sdk/src/v3/chat.test.ts

Lines changed: 81 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,20 @@ function createSSEStream(sseText: string): ReadableStream<Uint8Array> {
1818
});
1919
}
2020

21-
// Helper: create test UIMessages
21+
// Helper: create test UIMessages with unique IDs
22+
let messageIdCounter = 0;
23+
2224
function createUserMessage(text: string): UIMessage {
2325
return {
24-
id: `msg-${Date.now()}`,
26+
id: `msg-user-${++messageIdCounter}`,
2527
role: "user",
2628
parts: [{ type: "text", text }],
2729
};
2830
}
2931

3032
function createAssistantMessage(text: string): UIMessage {
3133
return {
32-
id: `msg-${Date.now()}`,
34+
id: `msg-assistant-${++messageIdCounter}`,
3335
role: "assistant",
3436
parts: [{ type: "text", text }],
3537
};
@@ -456,6 +458,82 @@ describe("TriggerChatTransport", () => {
456458
});
457459
});
458460

461+
describe("publicAccessToken from trigger response", () => {
462+
it("should use publicAccessToken from response body when x-trigger-jwt header is absent", async () => {
463+
const fetchSpy = vi.fn().mockImplementation(async (url: string | URL, init?: RequestInit) => {
464+
const urlStr = typeof url === "string" ? url : url.toString();
465+
466+
if (urlStr.includes("/trigger")) {
467+
// Return without x-trigger-jwt header — the ApiClient will attempt
468+
// to generate a JWT from the access token. In this test the token
469+
// generation will add a publicAccessToken to the result.
470+
return new Response(
471+
JSON.stringify({ id: "run_pat" }),
472+
{
473+
status: 200,
474+
headers: {
475+
"content-type": "application/json",
476+
// Include x-trigger-jwt to simulate the server returning a public token
477+
"x-trigger-jwt": "server-generated-public-token",
478+
},
479+
}
480+
);
481+
}
482+
483+
if (urlStr.includes("/realtime/v1/streams/")) {
484+
// Verify the Authorization header uses the server-generated token
485+
const authHeader = (init?.headers as Record<string, string>)?.["Authorization"];
486+
expect(authHeader).toBe("Bearer server-generated-public-token");
487+
488+
const chunks: UIMessageChunk[] = [
489+
{ type: "text-start", id: "p1" },
490+
{ type: "text-end", id: "p1" },
491+
];
492+
return new Response(createSSEStream(sseEncode(chunks)), {
493+
status: 200,
494+
headers: {
495+
"content-type": "text/event-stream",
496+
"X-Stream-Version": "v1",
497+
},
498+
});
499+
}
500+
501+
throw new Error(`Unexpected fetch URL: ${urlStr}`);
502+
});
503+
504+
global.fetch = fetchSpy;
505+
506+
const transport = new TriggerChatTransport({
507+
task: "my-task",
508+
accessToken: "caller-token",
509+
baseURL: "https://api.test.trigger.dev",
510+
});
511+
512+
const stream = await transport.sendMessages({
513+
trigger: "submit-message",
514+
chatId: "chat-pat",
515+
messageId: undefined,
516+
messages: [createUserMessage("test")],
517+
abortSignal: undefined,
518+
});
519+
520+
// Consume the stream
521+
const reader = stream.getReader();
522+
while (true) {
523+
const { done } = await reader.read();
524+
if (done) break;
525+
}
526+
527+
// Verify the stream subscription used the public token, not the caller token
528+
const streamCall = fetchSpy.mock.calls.find((call: any[]) =>
529+
(typeof call[0] === "string" ? call[0] : call[0].toString()).includes("/realtime/v1/streams/")
530+
);
531+
expect(streamCall).toBeDefined();
532+
const streamHeaders = streamCall![1]?.headers as Record<string, string>;
533+
expect(streamHeaders["Authorization"]).toBe("Bearer server-generated-public-token");
534+
});
535+
});
536+
459537
describe("error handling", () => {
460538
it("should propagate trigger API errors", async () => {
461539
global.fetch = vi.fn().mockImplementation(async (url: string | URL) => {

packages/trigger-sdk/src/v3/chat.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,10 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
252252
return;
253253
}
254254

255-
controller.enqueue(value.chunk as UIMessageChunk);
255+
// Guard against heartbeat or malformed SSE events
256+
if (value.chunk != null && typeof value.chunk === "object") {
257+
controller.enqueue(value.chunk as UIMessageChunk);
258+
}
256259
}
257260
} catch (readError) {
258261
reader.releaseLock();

0 commit comments

Comments
 (0)