Skip to content

Commit 654bab0

Browse files
Cover cleanup delete failures across stream subscribe and reconnect
Co-authored-by: Eric Allam <eric@trigger.dev>
1 parent b912e4f commit 654bab0

File tree

1 file changed

+109
-0
lines changed

1 file changed

+109
-0
lines changed

packages/ai/src/chatTransport.test.ts

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1359,6 +1359,58 @@ describe("TriggerChatTransport", function () {
13591359
expect(errors[0]?.error.message).toBe("stream subscribe root cause");
13601360
});
13611361

1362+
it("preserves stream subscribe failures when cleanup run-store delete throws", async function () {
1363+
const errors: TriggerChatTransportError[] = [];
1364+
const runStore = new FailingCleanupDeleteRunStore(1);
1365+
1366+
const server = await startServer(function (req, res) {
1367+
if (req.method === "POST" && req.url === "/api/v1/tasks/chat-task/trigger") {
1368+
res.writeHead(200, {
1369+
"content-type": "application/json",
1370+
"x-trigger-jwt": "pk_stream_subscribe_cleanup_delete_failure",
1371+
});
1372+
res.end(JSON.stringify({ id: "run_stream_subscribe_cleanup_delete_failure" }));
1373+
return;
1374+
}
1375+
1376+
res.writeHead(404);
1377+
res.end();
1378+
});
1379+
1380+
const transport = new TriggerChatTransport({
1381+
task: "chat-task",
1382+
stream: "chat-stream",
1383+
accessToken: "pk_trigger",
1384+
baseURL: server.url,
1385+
runStore,
1386+
onError: function onError(error) {
1387+
errors.push(error);
1388+
},
1389+
});
1390+
1391+
(transport as any).fetchRunStream = async function fetchRunStream() {
1392+
throw new Error("stream subscribe root cause");
1393+
};
1394+
1395+
await expect(
1396+
transport.sendMessages({
1397+
trigger: "submit-message",
1398+
chatId: "chat-stream-subscribe-cleanup-delete-failure",
1399+
messageId: undefined,
1400+
messages: [],
1401+
abortSignal: undefined,
1402+
})
1403+
).rejects.toThrowError("stream subscribe root cause");
1404+
1405+
expect(errors).toHaveLength(1);
1406+
expect(errors[0]).toMatchObject({
1407+
phase: "streamSubscribe",
1408+
chatId: "chat-stream-subscribe-cleanup-delete-failure",
1409+
runId: "run_stream_subscribe_cleanup_delete_failure",
1410+
});
1411+
expect(errors[0]?.error.message).toBe("stream subscribe root cause");
1412+
});
1413+
13621414
it("cleans up async run-store state when stream subscription fails", async function () {
13631415
const runStore = new AsyncTrackedRunStore();
13641416

@@ -1992,6 +2044,46 @@ describe("TriggerChatTransport", function () {
19922044
expect(errors[0]?.error.message).toBe("reconnect root cause");
19932045
});
19942046

2047+
it("preserves reconnect failures when cleanup run-store delete throws", async function () {
2048+
const errors: TriggerChatTransportError[] = [];
2049+
const runStore = new FailingCleanupDeleteRunStore(1);
2050+
runStore.set({
2051+
chatId: "chat-reconnect-cleanup-delete-failure",
2052+
runId: "run_reconnect_cleanup_delete_failure",
2053+
publicAccessToken: "pk_reconnect_cleanup_delete_failure",
2054+
streamKey: "chat-stream",
2055+
lastEventId: "100-0",
2056+
isActive: true,
2057+
});
2058+
2059+
const transport = new TriggerChatTransport({
2060+
task: "chat-task",
2061+
stream: "chat-stream",
2062+
accessToken: "pk_trigger",
2063+
runStore,
2064+
onError: function onError(error) {
2065+
errors.push(error);
2066+
},
2067+
});
2068+
2069+
(transport as any).fetchRunStream = async function fetchRunStream() {
2070+
throw new Error("reconnect root cause");
2071+
};
2072+
2073+
const stream = await transport.reconnectToStream({
2074+
chatId: "chat-reconnect-cleanup-delete-failure",
2075+
});
2076+
2077+
expect(stream).toBeNull();
2078+
expect(errors).toHaveLength(1);
2079+
expect(errors[0]).toMatchObject({
2080+
phase: "reconnect",
2081+
chatId: "chat-reconnect-cleanup-delete-failure",
2082+
runId: "run_reconnect_cleanup_delete_failure",
2083+
});
2084+
expect(errors[0]?.error.message).toBe("reconnect root cause");
2085+
});
2086+
19952087
it("normalizes non-Error reconnect failures before reporting onError", async function () {
19962088
const errors: TriggerChatTransportError[] = [];
19972089
const runStore = new InMemoryTriggerChatRunStore();
@@ -2486,6 +2578,23 @@ class FailingCleanupSetRunStore extends InMemoryTriggerChatRunStore {
24862578
}
24872579
}
24882580

2581+
class FailingCleanupDeleteRunStore extends InMemoryTriggerChatRunStore {
2582+
private deleteCalls = 0;
2583+
2584+
constructor(private readonly failOnDeleteCall: number) {
2585+
super();
2586+
}
2587+
2588+
public delete(chatId: string): void {
2589+
this.deleteCalls += 1;
2590+
if (this.deleteCalls === this.failOnDeleteCall) {
2591+
throw new Error("cleanup delete failed");
2592+
}
2593+
2594+
super.delete(chatId);
2595+
}
2596+
}
2597+
24892598
class AsyncTrackedRunStore implements TriggerChatRunStore {
24902599
private readonly runs = new Map<string, TriggerChatRunState>();
24912600
public readonly getCalls: string[] = [];

0 commit comments

Comments
 (0)