Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/browser/components/AIView.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,12 @@ const AIViewInner: React.FC<AIViewProps> = ({
}
}, [workspaceId, workspaceState?.queuedMessage, chatInputAPI]);

// Handler for sending queued message immediately (interrupt + send)
const handleSendQueuedImmediately = useCallback(async () => {
if (!workspaceState?.queuedMessage || !workspaceState.canInterrupt) return;
await window.api.workspace.interruptStream(workspaceId, { sendQueuedImmediately: true });
}, [workspaceId, workspaceState?.queuedMessage, workspaceState?.canInterrupt]);

const handleEditLastUserMessage = useCallback(async () => {
if (!workspaceState) return;

Expand Down Expand Up @@ -562,6 +568,9 @@ const AIViewInner: React.FC<AIViewProps> = ({
<QueuedMessage
message={workspaceState.queuedMessage}
onEdit={() => void handleEditQueuedMessage()}
onSendImmediately={
workspaceState.canInterrupt ? handleSendQueuedImmediately : undefined
}
/>
)}
<ConcurrentLocalWarning
Expand Down
41 changes: 38 additions & 3 deletions src/browser/components/Messages/QueuedMessage.tsx
Original file line number Diff line number Diff line change
@@ -1,17 +1,35 @@
import React from "react";
import React, { useCallback, useState } from "react";
import type { ButtonConfig } from "./MessageWindow";
import { MessageWindow } from "./MessageWindow";
import type { QueuedMessage as QueuedMessageType } from "@/common/types/message";
import { Pencil } from "lucide-react";
import { Tooltip, TooltipWrapper } from "../Tooltip";

interface QueuedMessageProps {
message: QueuedMessageType;
className?: string;
onEdit?: () => void;
onSendImmediately?: () => Promise<void>;
}

export const QueuedMessage: React.FC<QueuedMessageProps> = ({ message, className, onEdit }) => {
export const QueuedMessage: React.FC<QueuedMessageProps> = ({
message,
className,
onEdit,
onSendImmediately,
}) => {
const { content } = message;
const [isSending, setIsSending] = useState(false);

const handleSendImmediately = useCallback(async () => {
if (isSending || !onSendImmediately) return;
setIsSending(true);
try {
await onSendImmediately();
} finally {
setIsSending(false);
}
}, [isSending, onSendImmediately]);

const buttons: ButtonConfig[] = onEdit
? [
Expand All @@ -23,10 +41,27 @@ export const QueuedMessage: React.FC<QueuedMessageProps> = ({ message, className
]
: [];

// Clickable "Queued" label with tooltip
const queuedLabel = onSendImmediately ? (
<TooltipWrapper inline>
<button
type="button"
onClick={() => void handleSendImmediately()}
disabled={isSending}
className="cursor-pointer hover:underline disabled:cursor-not-allowed disabled:opacity-50"
>
Queued
</button>
<Tooltip align="center">Click to send immediately</Tooltip>
</TooltipWrapper>
) : (
"Queued"
);

return (
<>
<MessageWindow
label="Queued"
label={queuedLabel}
variant="user"
message={message}
className={className}
Expand Down
2 changes: 1 addition & 1 deletion src/common/types/ipc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ export interface IPCApi {
): Promise<Result<void, SendMessageError>>;
interruptStream(
workspaceId: string,
options?: { abandonPartial?: boolean }
options?: { abandonPartial?: boolean; sendQueuedImmediately?: boolean }
): Promise<Result<void, string>>;
clearQueue(workspaceId: string): Promise<Result<void, string>>;
truncateHistory(workspaceId: string, percentage?: number): Promise<Result<void, string>>;
Expand Down
6 changes: 4 additions & 2 deletions src/desktop/preload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,10 @@ const api: IPCApi = {
ipcRenderer.invoke(IPC_CHANNELS.WORKSPACE_SEND_MESSAGE, workspaceId, message, options),
resumeStream: (workspaceId, options) =>
ipcRenderer.invoke(IPC_CHANNELS.WORKSPACE_RESUME_STREAM, workspaceId, options),
interruptStream: (workspaceId: string, options?: { abandonPartial?: boolean }) =>
ipcRenderer.invoke(IPC_CHANNELS.WORKSPACE_INTERRUPT_STREAM, workspaceId, options),
interruptStream: (
workspaceId: string,
options?: { abandonPartial?: boolean; sendQueuedImmediately?: boolean }
) => ipcRenderer.invoke(IPC_CHANNELS.WORKSPACE_INTERRUPT_STREAM, workspaceId, options),
clearQueue: (workspaceId: string) =>
ipcRenderer.invoke(IPC_CHANNELS.WORKSPACE_CLEAR_QUEUE, workspaceId),
truncateHistory: (workspaceId, percentage) =>
Expand Down
4 changes: 2 additions & 2 deletions src/node/services/agentSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -591,9 +591,9 @@ export class AgentSession {

/**
* Send queued messages if any exist.
* Called when tool execution completes or stream ends.
* Called when tool execution completes, stream ends, or user clicks send immediately.
*/
private sendQueuedMessages(): void {
sendQueuedMessages(): void {
if (!this.messageQueue.isEmpty()) {
const { message, options } = this.messageQueue.produceMessage();
this.messageQueue.clear();
Expand Down
13 changes: 11 additions & 2 deletions src/node/services/ipcMain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1281,7 +1281,11 @@ export class IpcMain {

ipcMain.handle(
IPC_CHANNELS.WORKSPACE_INTERRUPT_STREAM,
async (_event, workspaceId: string, options?: { abandonPartial?: boolean }) => {
async (
_event,
workspaceId: string,
options?: { abandonPartial?: boolean; sendQueuedImmediately?: boolean }
) => {
log.debug("interruptStream handler: Received", { workspaceId, options });
try {
const session = this.getOrCreateSession(workspaceId);
Expand All @@ -1291,7 +1295,12 @@ export class IpcMain {
return { success: false, error: stopResult.error };
}

session.restoreQueueToInput();
if (options?.sendQueuedImmediately) {
// Send queued messages immediately instead of restoring to input
session.sendQueuedMessages();
} else {
session.restoreQueueToInput();
}

return { success: true, data: undefined };
} catch (error) {
Expand Down
68 changes: 68 additions & 0 deletions tests/ipcMain/queuedMessages.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,74 @@ describeIntegration("IpcMain queuedMessages integration tests", () => {
20000
);

test.concurrent(
"should send queued message immediately when sendQueuedImmediately is true",
async () => {
const { env, workspaceId, cleanup } = await setupWorkspace("anthropic");
try {
// Start a stream
void sendMessageWithModel(
env.mockIpcRenderer,
workspaceId,
"Count to 10 slowly",
modelString("anthropic", "claude-haiku-4-5")
);

const collector = createEventCollector(env.sentEvents, workspaceId);
await collector.waitForEvent("stream-start", 5000);

// Queue a message
await sendMessageWithModel(
env.mockIpcRenderer,
workspaceId,
"This message should be sent immediately",
modelString("anthropic", "claude-haiku-4-5")
);

// Verify message was queued
const queued = await getQueuedMessages(collector);
expect(queued).toEqual(["This message should be sent immediately"]);

// Interrupt the stream with sendQueuedImmediately flag
const interruptResult = await env.mockIpcRenderer.invoke(
IPC_CHANNELS.WORKSPACE_INTERRUPT_STREAM,
workspaceId,
{ sendQueuedImmediately: true }
);
expect(interruptResult.success).toBe(true);

// Wait for stream abort
await collector.waitForEvent("stream-abort", 5000);

// Should NOT get restore-to-input event (message is sent, not restored)
// Instead, we should see the queued message being sent as a new user message
const autoSendHappened = await waitFor(() => {
collector.collect();
const userMessages = collector
.getEvents()
.filter((e) => "role" in e && e.role === "user");
return userMessages.length === 2; // First + immediately sent
}, 5000);
expect(autoSendHappened).toBe(true);

// Verify queue was cleared
const queuedAfter = await getQueuedMessages(collector);
expect(queuedAfter).toEqual([]);

// Clear events to track second stream separately
env.sentEvents.length = 0;

// Wait for the immediately-sent message's stream
const collector2 = createEventCollector(env.sentEvents, workspaceId);
await collector2.waitForEvent("stream-start", 5000);
await collector2.waitForEvent("stream-end", 15000);
} finally {
await cleanup();
}
},
30000
);

test.concurrent(
"should combine multiple queued messages with newline separator",
async () => {
Expand Down