Skip to content

Commit d4266a6

Browse files
Cover combined cleanup and onError failure resilience paths
Co-authored-by: Eric Allam <eric@trigger.dev>
1 parent e0219ce commit d4266a6

File tree

1 file changed

+81
-0
lines changed

1 file changed

+81
-0
lines changed

packages/ai/src/chatTransport.test.ts

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1411,6 +1411,52 @@ describe("TriggerChatTransport", function () {
14111411
expect(errors[0]?.error.message).toBe("stream subscribe root cause");
14121412
});
14131413

1414+
it(
1415+
"preserves stream subscribe root failures when cleanup and onError callbacks both fail",
1416+
async function () {
1417+
const runStore = new FailingCleanupSetRunStore(2);
1418+
1419+
const server = await startServer(function (req, res) {
1420+
if (req.method === "POST" && req.url === "/api/v1/tasks/chat-task/trigger") {
1421+
res.writeHead(200, {
1422+
"content-type": "application/json",
1423+
"x-trigger-jwt": "pk_stream_subscribe_cleanup_and_onerror_failure",
1424+
});
1425+
res.end(JSON.stringify({ id: "run_stream_subscribe_cleanup_and_onerror_failure" }));
1426+
return;
1427+
}
1428+
1429+
res.writeHead(404);
1430+
res.end();
1431+
});
1432+
1433+
const transport = new TriggerChatTransport({
1434+
task: "chat-task",
1435+
stream: "chat-stream",
1436+
accessToken: "pk_trigger",
1437+
baseURL: server.url,
1438+
runStore,
1439+
onError: async function onError() {
1440+
throw new Error("onError failed");
1441+
},
1442+
});
1443+
1444+
(transport as any).fetchRunStream = async function fetchRunStream() {
1445+
throw new Error("stream subscribe root cause");
1446+
};
1447+
1448+
await expect(
1449+
transport.sendMessages({
1450+
trigger: "submit-message",
1451+
chatId: "chat-stream-subscribe-cleanup-and-onerror-failure",
1452+
messageId: undefined,
1453+
messages: [],
1454+
abortSignal: undefined,
1455+
})
1456+
).rejects.toThrowError("stream subscribe root cause");
1457+
}
1458+
);
1459+
14141460
it("cleans up async run-store state when stream subscription fails", async function () {
14151461
const runStore = new AsyncTrackedRunStore();
14161462

@@ -2252,6 +2298,41 @@ describe("TriggerChatTransport", function () {
22522298
expect(errors[0]?.error.message).toBe("reconnect root cause");
22532299
});
22542300

2301+
it(
2302+
"preserves reconnect root failures when cleanup and onError callbacks both fail",
2303+
async function () {
2304+
const runStore = new FailingCleanupDeleteRunStore(1);
2305+
runStore.set({
2306+
chatId: "chat-reconnect-cleanup-and-onerror-failure",
2307+
runId: "run_reconnect_cleanup_and_onerror_failure",
2308+
publicAccessToken: "pk_reconnect_cleanup_and_onerror_failure",
2309+
streamKey: "chat-stream",
2310+
lastEventId: "100-0",
2311+
isActive: true,
2312+
});
2313+
2314+
const transport = new TriggerChatTransport({
2315+
task: "chat-task",
2316+
stream: "chat-stream",
2317+
accessToken: "pk_trigger",
2318+
runStore,
2319+
onError: async function onError() {
2320+
throw new Error("onError failed");
2321+
},
2322+
});
2323+
2324+
(transport as any).fetchRunStream = async function fetchRunStream() {
2325+
throw new Error("reconnect root cause");
2326+
};
2327+
2328+
const stream = await transport.reconnectToStream({
2329+
chatId: "chat-reconnect-cleanup-and-onerror-failure",
2330+
});
2331+
2332+
expect(stream).toBeNull();
2333+
}
2334+
);
2335+
22552336
it("normalizes non-Error reconnect failures before reporting onError", async function () {
22562337
const errors: TriggerChatTransportError[] = [];
22572338
const runStore = new InMemoryTriggerChatRunStore();

0 commit comments

Comments
 (0)