Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 28 additions & 27 deletions web-common/src/features/chat/core/conversation-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,26 +46,17 @@ export class ConversationManager {
private static readonly MAX_CONCURRENT_STREAMS = 3;

private newConversation: Conversation;
private newConversationUnsub: (() => void) | null = null;
private conversations = new Map<string, Conversation>();
private conversationSelector: ConversationSelector;
private agent?: string;
private readonly agent?: string;

constructor(
public readonly instanceId: string,
options: ConversationManagerOptions,
) {
this.agent = options.agent;
this.newConversation = new Conversation(
this.instanceId,
NEW_CONVERSATION_ID,
{
agent: this.agent,
onStreamStart: () => this.enforceMaxConcurrentStreams(),
onConversationCreated: (conversationId: string) => {
this.handleConversationCreated(conversationId);
},
},
);
this.createNewConversation();

switch (options.conversationState) {
case "url":
Expand Down Expand Up @@ -121,10 +112,10 @@ export class ConversationManager {
const conversation = new Conversation(
this.instanceId,
$conversationId,
{
agent: this.agent,
onStreamStart: () => this.enforceMaxConcurrentStreams(),
},
this.agent,
);
conversation.on("stream-start", () =>
this.enforceMaxConcurrentStreams(),
);
this.conversations.set($conversationId, conversation);
return conversation;
Expand Down Expand Up @@ -162,6 +153,26 @@ export class ConversationManager {

// ===== PRIVATE IMPLEMENTATION =====

private createNewConversation() {
this.newConversationUnsub?.();
this.newConversation = new Conversation(
this.instanceId,
NEW_CONVERSATION_ID,
this.agent,
);
const streamStartUnsub = this.newConversation.on("stream-start", () =>
this.enforceMaxConcurrentStreams(),
);
const conversationStartedUnsub = this.newConversation.on(
"conversation-created",
(conversationId) => this.handleConversationCreated(conversationId),
);
this.newConversationUnsub = () => {
streamStartUnsub();
conversationStartedUnsub();
};
}

// ----- Stream Management -----

/**
Expand Down Expand Up @@ -221,17 +232,7 @@ export class ConversationManager {
this.conversations.set(conversationId, this.newConversation);

// Create a fresh "new" conversation instance
this.newConversation = new Conversation(
this.instanceId,
NEW_CONVERSATION_ID,
{
agent: this.agent,
onStreamStart: () => this.enforceMaxConcurrentStreams(),
onConversationCreated: (conversationId: string) => {
this.handleConversationCreated(conversationId);
},
},
);
this.createNewConversation();
}
}

Expand Down
56 changes: 23 additions & 33 deletions web-common/src/features/chat/core/conversation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,23 @@ import {
invalidateConversationsList,
NEW_CONVERSATION_ID,
} from "./utils";
import { EventEmitter } from "@rilldata/web-common/lib/event-emitter.ts";

type ConversionEvents = {
"conversation-created": string;
"stream-start": void;
message: V1Message;
"stream-complete": string;
error: string;
};

/**
* Individual conversation state management.
*
* Handles streaming message sending, optimistic updates, and conversation-specific queries
* for a single conversation using the streaming completion endpoint.
*/
export class Conversation {
export class Conversation extends EventEmitter<ConversionEvents> {
// Public reactive state
public readonly draftMessage = writable<string>("");
public readonly isStreaming = writable(false);
Expand All @@ -44,17 +53,9 @@ export class Conversation {
constructor(
private readonly instanceId: string,
public conversationId: string,
private readonly options: {
agent?: string;
onStreamStart?: () => void;
onConversationCreated?: (conversationId: string) => void;
} = {
agent: ToolName.ANALYST_AGENT, // Hardcoded default for now
},
private readonly agent: string = ToolName.ANALYST_AGENT, // Hardcoded default for now
) {
if (this.options) {
this.options.agent ??= ToolName.ANALYST_AGENT;
}
super();
}

// ===== PUBLIC API =====
Expand Down Expand Up @@ -114,15 +115,7 @@ export class Conversation {
* @param context - Chat context to be sent with the message
* @param options - Callback functions for different stages of message sending
*/
public async sendMessage(
context: RuntimeServiceCompleteBody,
options?: {
onStreamStart?: () => void;
onMessage?: (message: V1Message) => void;
onStreamComplete?: (conversationId: string) => void;
onError?: (error: string) => void;
},
): Promise<void> {
public async sendMessage(context: RuntimeServiceCompleteBody): Promise<void> {
// Prevent concurrent message sending
if (get(this.isStreaming)) {
this.streamError.set("Please wait for the current response to complete");
Expand All @@ -141,19 +134,15 @@ export class Conversation {
const userMessage = this.addOptimisticUserMessage(prompt);

try {
options?.onStreamStart?.();
this.emit("stream-start");
// Start streaming - this establishes the connection
const streamPromise = this.startStreaming(
prompt,
context,
options?.onMessage,
);
const streamPromise = this.startStreaming(prompt, context);

// Wait for streaming to complete
await streamPromise;

// Stream has completed successfully
options?.onStreamComplete?.(this.conversationId);
this.emit("stream-complete", this.conversationId);

// Temporary fix to make sure the title of the conversation is updated.
void invalidateConversationsList(this.instanceId);
Expand All @@ -171,7 +160,7 @@ export class Conversation {
userMessage,
this.hasReceivedFirstMessage,
);
options?.onError?.(this.formatTransportError(error));
this.emit("error", this.formatTransportError(error));
} finally {
this.isStreaming.set(false);
}
Expand Down Expand Up @@ -200,6 +189,8 @@ export class Conversation {
this.sseClient.cleanup();
this.sseClient = null;
}

this.clearListeners();
}

// ===== PRIVATE IMPLEMENTATION =====
Expand All @@ -213,7 +204,6 @@ export class Conversation {
private async startStreaming(
prompt: string,
context: RuntimeServiceCompleteBody | undefined,
onMessage: ((message: V1Message) => void) | undefined,
): Promise<void> {
// Initialize SSE client if not already done
if (!this.sseClient) {
Expand All @@ -238,7 +228,7 @@ export class Conversation {
message.data,
);
this.processStreamingResponse(response);
if (response.message) onMessage?.(response.message);
if (response.message) this.emit("message", response.message);
} catch (error) {
console.error("Failed to parse streaming response:", error);
this.streamError.set("Failed to process server response");
Expand Down Expand Up @@ -276,12 +266,12 @@ export class Conversation {
? undefined
: this.conversationId,
prompt,
agent: this.options?.agent,
agent: this.agent,
...context,
};

// Notify that streaming is about to start (for concurrent stream management)
this.options?.onStreamStart?.();
this.emit("stream-start");

// Start streaming - this will establish the connection and then stream until completion
await this.sseClient.start(baseUrl, {
Expand Down Expand Up @@ -360,7 +350,7 @@ export class Conversation {
this.conversationId = realConversationId;

// Notify that conversation was created
this.options?.onConversationCreated?.(realConversationId);
this.emit("conversation-created", realConversationId);
}

// ----- Cache Management -----
Expand Down
13 changes: 10 additions & 3 deletions web-common/src/features/chat/core/input/ChatInput.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@
$: draftMessageStore = currentConversation.draftMessage;
$: isStreamingStore = currentConversation.isStreaming;

let streamStartUnsub: (() => void) | undefined = undefined;
$: {
streamStartUnsub?.();
streamStartUnsub = currentConversation.on("stream-start", () => {
editor.commands.setContent("");
});
}

$: value = $draftMessageStore;
$: disabled = $getConversationQuery?.isLoading || $isStreamingStore;
$: canSend = !disabled && value.trim();
Expand All @@ -40,9 +48,7 @@

// Message handling with input focus
try {
await currentConversation.sendMessage($additionalContextStore, {
onStreamStart: () => editor.commands.setContent(""),
});
await currentConversation.sendMessage($additionalContextStore);
onSend?.();
} catch (error) {
console.error("Failed to send message:", error);
Expand Down Expand Up @@ -108,6 +114,7 @@
chatMounted.set(false);
editor.destroy();
unsubStartChatEvent();
streamStartUnsub?.();
};
});
</script>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ export class FileAndResourceWatcher {

await invalidate("init");

eventBus.emit("rill-yaml-updated", null);
eventBus.emit("rill-yaml-updated");
}
this.seenFiles.add(res.path);
break;
Expand Down
12 changes: 8 additions & 4 deletions web-common/src/features/sample-data/generate-sample-data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,11 @@ export async function generateSampleData(
},
},
});
const conversation = new Conversation(instanceId, NEW_CONVERSATION_ID, {
agent: ToolName.DEVELOPER_AGENT,
});
const conversation = new Conversation(
instanceId,
NEW_CONVERSATION_ID,
ToolName.DEVELOPER_AGENT,
);
const agentPrompt = `Generate a NEW model with fresh data for the following user prompt: ${userPrompt}`;
conversation.draftMessage.set(agentPrompt);

Expand Down Expand Up @@ -124,15 +126,17 @@ export async function generateSampleData(
}
}
};
const handleMessageUnsub = conversation.on("message", handleMessage);

let cancelled = false;

conversation.cancelStream();

await conversation.sendMessage({}, { onMessage: handleMessage });
await conversation.sendMessage({});

await waitUntil(() => !get(conversation.isStreaming));

handleMessageUnsub();
overlay.set(null);
if (cancelled) return;
if (!created) {
Expand Down
2 changes: 1 addition & 1 deletion web-common/src/lib/actions/modified-click.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export function modified(params: Params) {
}

const event = (modifier ? `${modifier}-click` : "click") as keyof Events;
eventBus.emit(event, null);
eventBus.emit(event);

if (handler) {
await handler(e);
Expand Down
Loading
Loading