Skip to content

Commit bc1a10c

Browse files
Ignore onTriggeredRun callback failures during streaming
Co-authored-by: Eric Allam <eric@trigger.dev>
1 parent e8222f7 commit bc1a10c

File tree

3 files changed

+68
-2
lines changed

3 files changed

+68
-2
lines changed

packages/ai/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ class MemoryStore implements TriggerChatRunStore {
117117
```
118118

119119
`onTriggeredRun` can also be async, which is useful for persisting run IDs before
120-
the chat stream is consumed.
120+
the chat stream is consumed. Callback failures are ignored so chat streaming can continue.
121121

122122
## `ai.tool(...)` example
123123

packages/ai/src/chatTransport.test.ts

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -688,6 +688,68 @@ describe("TriggerChatTransport", function () {
688688
expect(callbackCompleted).toBe(true);
689689
});
690690

691+
it("continues streaming when onTriggeredRun callback throws", async function () {
692+
let callbackCalled = false;
693+
694+
const server = await startServer(function (req, res) {
695+
if (req.method === "POST" && req.url === "/api/v1/tasks/chat-task/trigger") {
696+
res.writeHead(200, {
697+
"content-type": "application/json",
698+
"x-trigger-jwt": "pk_run_callback_error",
699+
});
700+
res.end(JSON.stringify({ id: "run_callback_error" }));
701+
return;
702+
}
703+
704+
if (
705+
req.method === "GET" &&
706+
req.url === "/realtime/v1/streams/run_callback_error/chat-stream"
707+
) {
708+
res.writeHead(200, {
709+
"content-type": "text/event-stream",
710+
});
711+
writeSSE(
712+
res,
713+
"1-0",
714+
JSON.stringify({ type: "text-start", id: "callback_error_1" })
715+
);
716+
writeSSE(
717+
res,
718+
"2-0",
719+
JSON.stringify({ type: "text-end", id: "callback_error_1" })
720+
);
721+
res.end();
722+
return;
723+
}
724+
725+
res.writeHead(404);
726+
res.end();
727+
});
728+
729+
const transport = new TriggerChatTransport({
730+
task: "chat-task",
731+
stream: "chat-stream",
732+
accessToken: "pk_trigger",
733+
baseURL: server.url,
734+
onTriggeredRun: async function onTriggeredRun() {
735+
callbackCalled = true;
736+
throw new Error("callback failed");
737+
},
738+
});
739+
740+
const stream = await transport.sendMessages({
741+
trigger: "submit-message",
742+
chatId: "chat-callback-error",
743+
messageId: undefined,
744+
messages: [],
745+
abortSignal: undefined,
746+
});
747+
748+
const chunks = await readChunks(stream);
749+
expect(callbackCalled).toBe(true);
750+
expect(chunks).toHaveLength(2);
751+
});
752+
691753
it("cleans run store state when stream completes", async function () {
692754
const trackedRunStore = new TrackedRunStore();
693755

packages/ai/src/chatTransport.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,11 @@ export class TriggerChatTransport<
180180
await this.runStore.set(runState);
181181

182182
if (this.onTriggeredRun) {
183-
await this.onTriggeredRun(runState);
183+
try {
184+
await this.onTriggeredRun(runState);
185+
} catch {
186+
// Ignore callback errors so chat streaming can continue.
187+
}
184188
}
185189

186190
const stream = await this.fetchRunStream(runState, options.abortSignal);

0 commit comments

Comments
 (0)