Skip to content

Commit 382577f

Browse files
Avoid masking transport failures when run-store cleanup fails
Co-authored-by: Eric Allam <eric@trigger.dev>
1 parent 90198de commit 382577f

File tree

2 files changed

+121
-4
lines changed

2 files changed

+121
-4
lines changed

packages/ai/src/chatTransport.test.ts

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1307,6 +1307,58 @@ describe("TriggerChatTransport", function () {
13071307
}
13081308
);
13091309

1310+
it("preserves stream subscribe failures when cleanup run-store set throws", async function () {
1311+
const errors: TriggerChatTransportError[] = [];
1312+
const runStore = new FailingCleanupSetRunStore(2);
1313+
1314+
const server = await startServer(function (req, res) {
1315+
if (req.method === "POST" && req.url === "/api/v1/tasks/chat-task/trigger") {
1316+
res.writeHead(200, {
1317+
"content-type": "application/json",
1318+
"x-trigger-jwt": "pk_stream_subscribe_cleanup_set_failure",
1319+
});
1320+
res.end(JSON.stringify({ id: "run_stream_subscribe_cleanup_set_failure" }));
1321+
return;
1322+
}
1323+
1324+
res.writeHead(404);
1325+
res.end();
1326+
});
1327+
1328+
const transport = new TriggerChatTransport({
1329+
task: "chat-task",
1330+
stream: "chat-stream",
1331+
accessToken: "pk_trigger",
1332+
baseURL: server.url,
1333+
runStore,
1334+
onError: function onError(error) {
1335+
errors.push(error);
1336+
},
1337+
});
1338+
1339+
(transport as any).fetchRunStream = async function fetchRunStream() {
1340+
throw new Error("stream subscribe root cause");
1341+
};
1342+
1343+
await expect(
1344+
transport.sendMessages({
1345+
trigger: "submit-message",
1346+
chatId: "chat-stream-subscribe-cleanup-set-failure",
1347+
messageId: undefined,
1348+
messages: [],
1349+
abortSignal: undefined,
1350+
})
1351+
).rejects.toThrowError("stream subscribe root cause");
1352+
1353+
expect(errors).toHaveLength(1);
1354+
expect(errors[0]).toMatchObject({
1355+
phase: "streamSubscribe",
1356+
chatId: "chat-stream-subscribe-cleanup-set-failure",
1357+
runId: "run_stream_subscribe_cleanup_set_failure",
1358+
});
1359+
expect(errors[0]?.error.message).toBe("stream subscribe root cause");
1360+
});
1361+
13101362
it("cleans up async run-store state when stream subscription fails", async function () {
13111363
const runStore = new AsyncTrackedRunStore();
13121364

@@ -1900,6 +1952,46 @@ describe("TriggerChatTransport", function () {
19001952
expect(runStore.get("chat-reconnect-error")).toBeUndefined();
19011953
});
19021954

1955+
it("preserves reconnect failures when cleanup run-store set throws", async function () {
1956+
const errors: TriggerChatTransportError[] = [];
1957+
const runStore = new FailingCleanupSetRunStore(2);
1958+
runStore.set({
1959+
chatId: "chat-reconnect-cleanup-set-failure",
1960+
runId: "run_reconnect_cleanup_set_failure",
1961+
publicAccessToken: "pk_reconnect_cleanup_set_failure",
1962+
streamKey: "chat-stream",
1963+
lastEventId: "100-0",
1964+
isActive: true,
1965+
});
1966+
1967+
const transport = new TriggerChatTransport({
1968+
task: "chat-task",
1969+
stream: "chat-stream",
1970+
accessToken: "pk_trigger",
1971+
runStore,
1972+
onError: function onError(error) {
1973+
errors.push(error);
1974+
},
1975+
});
1976+
1977+
(transport as any).fetchRunStream = async function fetchRunStream() {
1978+
throw new Error("reconnect root cause");
1979+
};
1980+
1981+
const stream = await transport.reconnectToStream({
1982+
chatId: "chat-reconnect-cleanup-set-failure",
1983+
});
1984+
1985+
expect(stream).toBeNull();
1986+
expect(errors).toHaveLength(1);
1987+
expect(errors[0]).toMatchObject({
1988+
phase: "reconnect",
1989+
chatId: "chat-reconnect-cleanup-set-failure",
1990+
runId: "run_reconnect_cleanup_set_failure",
1991+
});
1992+
expect(errors[0]?.error.message).toBe("reconnect root cause");
1993+
});
1994+
19031995
it("normalizes non-Error reconnect failures before reporting onError", async function () {
19041996
const errors: TriggerChatTransportError[] = [];
19051997
const runStore = new InMemoryTriggerChatRunStore();
@@ -2377,6 +2469,23 @@ class TrackedRunStore extends InMemoryTriggerChatRunStore {
23772469
}
23782470
}
23792471

2472+
class FailingCleanupSetRunStore extends InMemoryTriggerChatRunStore {
2473+
private setCalls = 0;
2474+
2475+
constructor(private readonly failOnSetCall: number) {
2476+
super();
2477+
}
2478+
2479+
public set(state: TriggerChatRunState): void {
2480+
this.setCalls += 1;
2481+
if (this.setCalls === this.failOnSetCall) {
2482+
throw new Error("cleanup set failed");
2483+
}
2484+
2485+
super.set(state);
2486+
}
2487+
}
2488+
23802489
class AsyncTrackedRunStore implements TriggerChatRunStore {
23812490
private readonly runs = new Map<string, TriggerChatRunState>();
23822491
public readonly getCalls: string[] = [];

packages/ai/src/chatTransport.ts

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ export class TriggerChatTransport<
235235
try {
236236
stream = await this.fetchRunStream(runState, options.abortSignal);
237237
} catch (error) {
238-
await this.markRunInactiveAndDelete(runState);
238+
await this.tryMarkRunInactiveAndDelete(runState);
239239
await this.reportError({
240240
phase: "streamSubscribe",
241241
chatId: runState.chatId,
@@ -266,7 +266,7 @@ export class TriggerChatTransport<
266266
try {
267267
stream = await this.fetchRunStream(runState, undefined, runState.lastEventId);
268268
} catch (error) {
269-
await this.markRunInactiveAndDelete(runState);
269+
await this.tryMarkRunInactiveAndDelete(runState);
270270
await this.reportError({
271271
phase: "reconnect",
272272
chatId: runState.chatId,
@@ -345,12 +345,12 @@ export class TriggerChatTransport<
345345

346346
const runState = await this.runStore.get(chatId);
347347
if (runState) {
348-
await this.markRunInactiveAndDelete(runState);
348+
await this.tryMarkRunInactiveAndDelete(runState);
349349
}
350350
} catch (error) {
351351
const runState = await this.runStore.get(chatId);
352352
if (runState) {
353-
await this.markRunInactiveAndDelete(runState);
353+
await this.tryMarkRunInactiveAndDelete(runState);
354354
await this.reportError({
355355
phase: "consumeTrackingStream",
356356
chatId: runState.chatId,
@@ -389,6 +389,14 @@ export class TriggerChatTransport<
389389
await this.runStore.delete(runState.chatId);
390390
}
391391

392+
private async tryMarkRunInactiveAndDelete(runState: TriggerChatRunState) {
393+
try {
394+
await this.markRunInactiveAndDelete(runState);
395+
} catch {
396+
// Best effort cleanup only; never mask the original transport failure.
397+
}
398+
}
399+
392400
private async reportError(event: TriggerChatTransportError) {
393401
if (!this.onError) {
394402
return;

0 commit comments

Comments
 (0)