Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/browser/stores/WorkspaceStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -685,8 +685,8 @@ export class WorkspaceStore {
createdAt: string
): StreamingMessageAggregator {
if (!this.aggregators.has(workspaceId)) {
// Create new aggregator with required createdAt
this.aggregators.set(workspaceId, new StreamingMessageAggregator(createdAt));
// Create new aggregator with required createdAt and workspaceId for localStorage persistence
this.aggregators.set(workspaceId, new StreamingMessageAggregator(createdAt, workspaceId));
this.workspaceCreatedAt.set(workspaceId, createdAt);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -759,4 +759,73 @@ describe("StreamingMessageAggregator - Agent Status", () => {
expect(finalStatus?.message).toBe("Tests passed");
expect(finalStatus?.url).toBe(testUrl); // URL from previous stream persists!
});

it("should persist URL across multiple assistant messages when loading from history", () => {
// Regression test: URL should persist even when only the most recent assistant message
// has a status_set without a URL - the URL from an earlier message should be used
const aggregator = new StreamingMessageAggregator("2024-01-01T00:00:00.000Z");
const testUrl = "https://github.com/owner/repo/pull/123";

// Historical messages: first assistant sets URL, second assistant updates status without URL
const historicalMessages = [
{
id: "user1",
role: "user" as const,
parts: [{ type: "text" as const, text: "Make a PR" }],
metadata: { timestamp: 1000, historySequence: 1 },
},
{
id: "assistant1",
role: "assistant" as const,
parts: [
{
type: "dynamic-tool" as const,
toolName: "status_set",
toolCallId: "tool1",
state: "output-available" as const,
input: { emoji: "🔗", message: "PR submitted", url: testUrl },
output: { success: true, emoji: "🔗", message: "PR submitted", url: testUrl },
timestamp: 1001,
tokens: 10,
},
],
metadata: { timestamp: 1001, historySequence: 2 },
},
{
id: "user2",
role: "user" as const,
parts: [{ type: "text" as const, text: "Continue" }],
metadata: { timestamp: 2000, historySequence: 3 },
},
{
id: "assistant2",
role: "assistant" as const,
parts: [
{
type: "dynamic-tool" as const,
toolName: "status_set",
toolCallId: "tool2",
state: "output-available" as const,
input: { emoji: "✅", message: "Tests passed" },
output: { success: true, emoji: "✅", message: "Tests passed" }, // No URL!
timestamp: 2001,
tokens: 10,
},
],
metadata: { timestamp: 2001, historySequence: 4 },
},
];

aggregator.loadHistoricalMessages(historicalMessages);

const status = aggregator.getAgentStatus();
expect(status?.emoji).toBe("✅");
expect(status?.message).toBe("Tests passed");
// URL from the first assistant message should persist!
expect(status?.url).toBe(testUrl);
});

// Note: URL persistence through compaction is handled via localStorage,
// which is tested in integration tests. The aggregator saves lastStatusUrl
// to localStorage when it changes, and loads it on construction.
});
81 changes: 73 additions & 8 deletions src/browser/utils/messages/StreamingMessageAggregator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,13 @@ export class StreamingMessageAggregator {
private agentStatus: { emoji: string; message: string; url?: string } | undefined = undefined;

// Last URL set via status_set - persists even when agentStatus is cleared
// This ensures URL stays available across stream boundaries
// This ensures URL stays available across stream boundaries and through compaction
// Persisted to localStorage keyed by workspaceId
private lastStatusUrl: string | undefined = undefined;

// Workspace ID for localStorage persistence
private readonly workspaceId: string | undefined;

// Workspace init hook state (ephemeral, not persisted to history)
private initState: {
status: "running" | "success" | "error";
Expand All @@ -111,10 +115,47 @@ export class StreamingMessageAggregator {
// REQUIRED: Backend guarantees every workspace has createdAt via config.ts
private readonly createdAt: string;

constructor(createdAt: string) {
constructor(createdAt: string, workspaceId?: string) {
this.createdAt = createdAt;
this.workspaceId = workspaceId;
// Load persisted lastStatusUrl from localStorage
if (workspaceId) {
this.lastStatusUrl = this.loadLastStatusUrl();
}
this.updateRecency();
}

/** localStorage key for persisting lastStatusUrl. Only call when workspaceId is defined. */
private getStatusUrlKey(): string | undefined {
if (!this.workspaceId) return undefined;
return `mux:workspace:${this.workspaceId}:lastStatusUrl`;
}

/** Load lastStatusUrl from localStorage */
private loadLastStatusUrl(): string | undefined {
const key = this.getStatusUrlKey();
if (!key) return undefined;
try {
const stored = localStorage.getItem(key);
return stored ?? undefined;
} catch {
return undefined;
}
}

/**
* Persist lastStatusUrl to localStorage.
* Once set, the URL can only be replaced with a new URL, never deleted.
*/
private saveLastStatusUrl(url: string): void {
const key = this.getStatusUrlKey();
if (!key) return;
try {
localStorage.setItem(key, url);
} catch {
// Ignore localStorage errors
}
}
private invalidateCache(): void {
this.cachedAllMessages = null;
this.cachedDisplayedMessages = null;
Expand Down Expand Up @@ -232,16 +273,39 @@ export class StreamingMessageAggregator {
this.messages.set(message.id, message);
}

// Then, reconstruct derived state from the most recent assistant message
// Use "streaming" context if there's an active stream (reconnection), otherwise "historical"
const context = hasActiveStream ? "streaming" : "historical";

const sortedMessages = [...messages].sort(
(a, b) => (b.metadata?.historySequence ?? 0) - (a.metadata?.historySequence ?? 0)
// Sort messages in chronological order for processing
const chronologicalMessages = [...messages].sort(
(a, b) => (a.metadata?.historySequence ?? 0) - (b.metadata?.historySequence ?? 0)
);

// Find the most recent assistant message
const lastAssistantMessage = sortedMessages.find((msg) => msg.role === "assistant");
// First pass: scan all messages to build up lastStatusUrl from tool calls
// This ensures URL persistence works even if the URL was set in an earlier message
// Also persists to localStorage for future loads (survives compaction)
for (const message of chronologicalMessages) {
if (message.role === "assistant") {
for (const part of message.parts) {
if (
isDynamicToolPart(part) &&
part.state === "output-available" &&
part.toolName === "status_set" &&
hasSuccessResult(part.output)
) {
const result = part.output as Extract<StatusSetToolResult, { success: true }>;
if (result.url) {
this.lastStatusUrl = result.url;
this.saveLastStatusUrl(result.url);
}
}
}
}
}

// Second pass: reconstruct derived state from the most recent assistant message only
// (TODOs and agentStatus should reflect only the latest state)
const lastAssistantMessage = chronologicalMessages.findLast((msg) => msg.role === "assistant");

if (lastAssistantMessage) {
// Process all tool results from the most recent assistant message
Expand Down Expand Up @@ -577,9 +641,10 @@ export class StreamingMessageAggregator {
if (toolName === "status_set" && hasSuccessResult(output)) {
const result = output as Extract<StatusSetToolResult, { success: true }>;

// Update lastStatusUrl if a new URL is provided
// Update lastStatusUrl if a new URL is provided, and persist to localStorage
if (result.url) {
this.lastStatusUrl = result.url;
this.saveLastStatusUrl(result.url);
}

// Use the provided URL, or fall back to the last URL ever set
Expand Down