Skip to content

Commit 0ca459d

Browse files
feat: add chat transport and AI chat helpers to @trigger.dev/sdk
Two new subpath exports: @trigger.dev/sdk/chat (frontend, browser-safe): - TriggerChatTransport — ChatTransport implementation for useChat - createChatTransport() — factory function - TriggerChatTransportOptions type @trigger.dev/sdk/ai (backend, adds to existing ai.tool/ai.currentToolOptions): - chatTask() — pre-typed task wrapper with auto-pipe - pipeChat() — pipe StreamTextResult to realtime stream - CHAT_STREAM_KEY constant - ChatTaskPayload type - ChatTaskOptions type - PipeChatOptions type Co-authored-by: Eric Allam <eric@trigger.dev>
1 parent c3656a5 commit 0ca459d

File tree

6 files changed

+832
-12
lines changed

6 files changed

+832
-12
lines changed

packages/ai/src/chatTask.ts

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
import { task as createTask } from "@trigger.dev/sdk";
2+
import type { Task } from "@trigger.dev/core/v3";
3+
import type { ChatTaskPayload } from "./types.js";
4+
import { pipeChat } from "./pipeChat.js";
5+
6+
/**
7+
* Options for defining a chat task.
8+
*
9+
* This is a simplified version of the standard task options with the payload
10+
* pre-typed as `ChatTaskPayload`.
11+
*/
12+
export type ChatTaskOptions<TIdentifier extends string> = {
13+
/** Unique identifier for the task */
14+
id: TIdentifier;
15+
16+
/** Optional description of the task */
17+
description?: string;
18+
19+
/** Retry configuration */
20+
retry?: {
21+
maxAttempts?: number;
22+
factor?: number;
23+
minTimeoutInMs?: number;
24+
maxTimeoutInMs?: number;
25+
randomize?: boolean;
26+
};
27+
28+
/** Queue configuration */
29+
queue?: {
30+
name?: string;
31+
concurrencyLimit?: number;
32+
};
33+
34+
/** Machine preset for the task */
35+
machine?: {
36+
preset?: string;
37+
};
38+
39+
/** Maximum duration in seconds */
40+
maxDuration?: number;
41+
42+
/**
43+
* The main run function for the chat task.
44+
*
45+
* Receives a `ChatTaskPayload` with the conversation messages, chat session ID,
46+
* and trigger type.
47+
*
48+
* **Auto-piping:** If this function returns a value that has a `.toUIMessageStream()` method
49+
* (like a `StreamTextResult` from `streamText()`), the stream will automatically be piped
50+
* to the frontend via the chat realtime stream. If you need to pipe from deeper in your
51+
* code, use `pipeChat()` instead and don't return the result.
52+
*/
53+
run: (payload: ChatTaskPayload) => Promise<unknown>;
54+
};
55+
56+
/**
57+
* An object that has a `toUIMessageStream()` method, like the result of `streamText()`.
58+
*/
59+
type UIMessageStreamable = {
60+
toUIMessageStream: (...args: any[]) => AsyncIterable<unknown> | ReadableStream<unknown>;
61+
};
62+
63+
function isUIMessageStreamable(value: unknown): value is UIMessageStreamable {
64+
return (
65+
typeof value === "object" &&
66+
value !== null &&
67+
"toUIMessageStream" in value &&
68+
typeof (value as any).toUIMessageStream === "function"
69+
);
70+
}
71+
72+
/**
73+
* Creates a Trigger.dev task pre-configured for AI SDK chat.
74+
*
75+
* This is a convenience wrapper around `task()` from `@trigger.dev/sdk` that:
76+
* - **Pre-types the payload** as `ChatTaskPayload` — no manual typing needed
77+
* - **Auto-pipes the stream** if the `run` function returns a `StreamTextResult`
78+
*
79+
* Requires `@trigger.dev/sdk` to be installed (it's a peer dependency).
80+
*
81+
* @example
82+
* ```ts
83+
* import { chatTask } from "@trigger.dev/ai";
84+
* import { streamText, convertToModelMessages } from "ai";
85+
* import { openai } from "@ai-sdk/openai";
86+
*
87+
* // Simple: return streamText result — auto-piped to the frontend
88+
* export const myChatTask = chatTask({
89+
* id: "my-chat-task",
90+
* run: async ({ messages }) => {
91+
* return streamText({
92+
* model: openai("gpt-4o"),
93+
* messages: convertToModelMessages(messages),
94+
* });
95+
* },
96+
* });
97+
* ```
98+
*
99+
* @example
100+
* ```ts
101+
* import { chatTask, pipeChat } from "@trigger.dev/ai";
102+
*
103+
* // Complex: use pipeChat() from deep inside your agent code
104+
* export const myAgentTask = chatTask({
105+
* id: "my-agent-task",
106+
* run: async ({ messages }) => {
107+
* await runComplexAgentLoop(messages);
108+
* // pipeChat() called internally by the agent loop
109+
* },
110+
* });
111+
* ```
112+
*/
113+
export function chatTask<TIdentifier extends string>(
114+
options: ChatTaskOptions<TIdentifier>
115+
): Task<TIdentifier, ChatTaskPayload, unknown> {
116+
const { run: userRun, ...restOptions } = options;
117+
118+
return createTask<TIdentifier, ChatTaskPayload, unknown>({
119+
...restOptions,
120+
run: async (payload: ChatTaskPayload) => {
121+
const result = await userRun(payload);
122+
123+
// If the run function returned a StreamTextResult or similar,
124+
// automatically pipe it to the chat stream
125+
if (isUIMessageStreamable(result)) {
126+
await pipeChat(result);
127+
}
128+
129+
return result;
130+
},
131+
});
132+
}

packages/ai/src/pipeChat.ts

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
import { realtimeStreams } from "@trigger.dev/core/v3";
2+
3+
/**
4+
* The default stream key used for chat transport communication.
5+
*
6+
* Both `TriggerChatTransport` (frontend) and `pipeChat` (backend) use this key
7+
* by default to ensure they communicate over the same stream.
8+
*/
9+
export const CHAT_STREAM_KEY = "chat";
10+
11+
/**
12+
* Options for `pipeChat`.
13+
*/
14+
export type PipeChatOptions = {
15+
/**
16+
* Override the stream key to pipe to.
17+
* Must match the `streamKey` option on `TriggerChatTransport`.
18+
*
19+
* @default "chat"
20+
*/
21+
streamKey?: string;
22+
23+
/**
24+
* An AbortSignal to cancel the stream.
25+
*/
26+
signal?: AbortSignal;
27+
28+
/**
29+
* The target run ID to pipe the stream to.
30+
* @default "self" (current run)
31+
*/
32+
target?: string;
33+
};
34+
35+
/**
36+
* An object that has a `toUIMessageStream()` method, like the result of `streamText()` from the AI SDK.
37+
*/
38+
type UIMessageStreamable = {
39+
toUIMessageStream: (...args: any[]) => AsyncIterable<unknown> | ReadableStream<unknown>;
40+
};
41+
42+
function isUIMessageStreamable(value: unknown): value is UIMessageStreamable {
43+
return (
44+
typeof value === "object" &&
45+
value !== null &&
46+
"toUIMessageStream" in value &&
47+
typeof (value as any).toUIMessageStream === "function"
48+
);
49+
}
50+
51+
function isAsyncIterable(value: unknown): value is AsyncIterable<unknown> {
52+
return (
53+
typeof value === "object" &&
54+
value !== null &&
55+
Symbol.asyncIterator in value
56+
);
57+
}
58+
59+
function isReadableStream(value: unknown): value is ReadableStream<unknown> {
60+
return (
61+
typeof value === "object" &&
62+
value !== null &&
63+
typeof (value as any).getReader === "function"
64+
);
65+
}
66+
67+
/**
68+
* Pipes a chat stream to the realtime stream, making it available to the
69+
* `TriggerChatTransport` on the frontend.
70+
*
71+
* Accepts any of:
72+
* - A `StreamTextResult` from the AI SDK (has `.toUIMessageStream()`)
73+
* - An `AsyncIterable` of `UIMessageChunk`s
74+
* - A `ReadableStream` of `UIMessageChunk`s
75+
*
76+
* This must be called from inside a Trigger.dev task's `run` function.
77+
*
78+
* @example
79+
* ```ts
80+
* import { task } from "@trigger.dev/sdk";
81+
* import { pipeChat, type ChatTaskPayload } from "@trigger.dev/ai";
82+
* import { streamText, convertToModelMessages } from "ai";
83+
*
84+
* export const myChatTask = task({
85+
* id: "my-chat-task",
86+
* run: async (payload: ChatTaskPayload) => {
87+
* const result = streamText({
88+
* model: openai("gpt-4o"),
89+
* messages: convertToModelMessages(payload.messages),
90+
* });
91+
*
92+
* await pipeChat(result);
93+
* },
94+
* });
95+
* ```
96+
*
97+
* @example
98+
* ```ts
99+
* // Deep inside your agent library — pipeChat works from anywhere inside a task
100+
* async function runAgentLoop(messages: CoreMessage[]) {
101+
* const result = streamText({ model, messages });
102+
* await pipeChat(result);
103+
* }
104+
* ```
105+
*
106+
* @param source - A StreamTextResult, AsyncIterable, or ReadableStream of UIMessageChunks
107+
* @param options - Optional configuration
108+
* @returns A promise that resolves when the stream has been fully piped
109+
*/
110+
export async function pipeChat(
111+
source: UIMessageStreamable | AsyncIterable<unknown> | ReadableStream<unknown>,
112+
options?: PipeChatOptions
113+
): Promise<void> {
114+
const streamKey = options?.streamKey ?? CHAT_STREAM_KEY;
115+
116+
// Resolve the source to an AsyncIterable or ReadableStream
117+
let stream: AsyncIterable<unknown> | ReadableStream<unknown>;
118+
119+
if (isUIMessageStreamable(source)) {
120+
stream = source.toUIMessageStream();
121+
} else if (isAsyncIterable(source) || isReadableStream(source)) {
122+
stream = source;
123+
} else {
124+
throw new Error(
125+
"pipeChat: source must be a StreamTextResult (with .toUIMessageStream()), " +
126+
"an AsyncIterable, or a ReadableStream"
127+
);
128+
}
129+
130+
// Pipe to the realtime stream
131+
const instance = realtimeStreams.pipe(streamKey, stream, {
132+
signal: options?.signal,
133+
target: options?.target,
134+
});
135+
136+
await instance.wait();
137+
}

packages/ai/src/types.ts

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ export type TriggerChatTransportOptions = {
88
* The Trigger.dev task ID to trigger for chat completions.
99
* This task will receive the chat messages as its payload.
1010
*/
11-
taskId: string;
11+
task: string;
1212

1313
/**
1414
* An access token for authenticating with the Trigger.dev API.
@@ -36,8 +36,8 @@ export type TriggerChatTransportOptions = {
3636

3737
/**
3838
* The stream key where the task pipes UIMessageChunk data.
39-
* Your task must pipe the AI SDK stream to this same key using
40-
* `streams.pipe(streamKey, result.toUIMessageStream())`.
39+
* When using `chatTask()` or `pipeChat()`, this is handled automatically.
40+
* Only set this if you're using a custom stream key.
4141
*
4242
* @default "chat"
4343
*/
@@ -59,15 +59,16 @@ export type TriggerChatTransportOptions = {
5959
};
6060

6161
/**
62-
* The payload shape that TriggerChatTransport sends to the triggered task.
62+
* The payload shape that the transport sends to the triggered task.
6363
*
64-
* Use this type to type your task's `run` function payload:
64+
* When using `chatTask()`, the payload is automatically typed — you don't need
65+
* to import this type. When using `task()` directly, use this type to annotate
66+
* your payload:
6567
*
6668
* @example
6769
* ```ts
68-
* import { task, streams } from "@trigger.dev/sdk";
69-
* import { streamText, convertToModelMessages } from "ai";
70-
* import type { ChatTaskPayload } from "@trigger.dev/ai";
70+
* import { task } from "@trigger.dev/sdk";
71+
* import { pipeChat, type ChatTaskPayload } from "@trigger.dev/ai";
7172
*
7273
* export const myChatTask = task({
7374
* id: "my-chat-task",
@@ -76,9 +77,7 @@ export type TriggerChatTransportOptions = {
7677
* model: openai("gpt-4o"),
7778
* messages: convertToModelMessages(payload.messages),
7879
* });
79-
*
80-
* const { waitUntilComplete } = streams.pipe("chat", result.toUIMessageStream());
81-
* await waitUntilComplete();
80+
* await pipeChat(result);
8281
* },
8382
* });
8483
* ```
@@ -110,6 +109,7 @@ export type ChatTaskPayload<TMessage extends UIMessage = UIMessage> = {
110109

111110
/**
112111
* Internal state for tracking active chat sessions, used for stream reconnection.
112+
* @internal
113113
*/
114114
export type ChatSessionState = {
115115
runId: string;

packages/trigger-sdk/package.json

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@
2424
"./package.json": "./package.json",
2525
".": "./src/v3/index.ts",
2626
"./v3": "./src/v3/index.ts",
27-
"./ai": "./src/v3/ai.ts"
27+
"./ai": "./src/v3/ai.ts",
28+
"./chat": "./src/v3/chat.ts"
2829
},
2930
"sourceDialects": [
3031
"@triggerdotdev/source"
@@ -37,6 +38,9 @@
3738
],
3839
"ai": [
3940
"dist/commonjs/v3/ai.d.ts"
41+
],
42+
"chat": [
43+
"dist/commonjs/v3/chat.d.ts"
4044
]
4145
}
4246
},
@@ -123,6 +127,17 @@
123127
"types": "./dist/commonjs/v3/ai.d.ts",
124128
"default": "./dist/commonjs/v3/ai.js"
125129
}
130+
},
131+
"./chat": {
132+
"import": {
133+
"@triggerdotdev/source": "./src/v3/chat.ts",
134+
"types": "./dist/esm/v3/chat.d.ts",
135+
"default": "./dist/esm/v3/chat.js"
136+
},
137+
"require": {
138+
"types": "./dist/commonjs/v3/chat.d.ts",
139+
"default": "./dist/commonjs/v3/chat.js"
140+
}
126141
}
127142
},
128143
"main": "./dist/commonjs/v3/index.js",

0 commit comments

Comments
 (0)