Skip to content

Commit d136f35

Browse files
committed
fix: batch async message yields to prevent scroll issues
The oRPC async generator was yielding messages one at a time with async boundaries, causing premature React renders during history replay. This led to scroll-to-bottom firing before all messages loaded. Extract createAsyncMessageQueue utility that yields all queued messages synchronously (no async pauses within a batch). Used by both the real router and storybook mocks to match original IPC callback behavior.
1 parent 8144f9e commit d136f35

File tree

3 files changed

+81
-53
lines changed

3 files changed

+81
-53
lines changed

.storybook/mocks/orpc.ts

Lines changed: 5 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import type { ProjectConfig } from "@/node/config";
99
import type { WorkspaceChatMessage } from "@/common/orpc/types";
1010
import type { ChatStats } from "@/common/types/chatStats";
1111
import { DEFAULT_RUNTIME_CONFIG } from "@/common/constants/workspace";
12+
import { createAsyncMessageQueue } from "@/common/utils/asyncMessageQueue";
1213

1314
export interface MockORPCClientOptions {
1415
projects?: Map<string, ProjectConfig>;
@@ -136,38 +137,15 @@ export function createMockORPCClient(options: MockORPCClientOptions = {}): ORPCC
136137
return;
137138
}
138139

139-
// Create a queue-based async iterator
140-
const queue: WorkspaceChatMessage[] = [];
141-
let resolveNext: ((msg: WorkspaceChatMessage) => void) | null = null;
142-
let ended = false;
143-
144-
const emit = (msg: WorkspaceChatMessage) => {
145-
if (ended) return;
146-
if (resolveNext) {
147-
const resolve = resolveNext;
148-
resolveNext = null;
149-
resolve(msg);
150-
} else {
151-
queue.push(msg);
152-
}
153-
};
140+
const { push, iterate, end } = createAsyncMessageQueue<WorkspaceChatMessage>();
154141

155142
// Call the user's onChat handler
156-
const cleanup = onChat(input.workspaceId, emit);
143+
const cleanup = onChat(input.workspaceId, push);
157144

158145
try {
159-
while (!ended) {
160-
if (queue.length > 0) {
161-
yield queue.shift()!;
162-
} else {
163-
const msg = await new Promise<WorkspaceChatMessage>((resolve) => {
164-
resolveNext = resolve;
165-
});
166-
yield msg;
167-
}
168-
}
146+
yield* iterate();
169147
} finally {
170-
ended = true;
148+
end();
171149
cleanup?.();
172150
}
173151
},
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/**
2+
* Creates a queue-based async message stream.
3+
*
4+
* Messages pushed to the queue are yielded in batches (all queued messages
5+
* at once without async boundaries). This prevents premature React renders
6+
* when loading many messages quickly (e.g., history replay).
7+
*
8+
* Usage:
9+
* ```ts
10+
* const { push, iterate, end } = createAsyncMessageQueue<MyMessage>();
11+
*
12+
* // Push messages from any source
13+
* eventEmitter.on('message', push);
14+
*
15+
* // Consume as async generator
16+
* for await (const msg of iterate()) {
17+
* handleMessage(msg);
18+
* }
19+
*
20+
* // Clean up
21+
* end();
22+
* ```
23+
*/
24+
export function createAsyncMessageQueue<T>(): {
25+
push: (msg: T) => void;
26+
iterate: () => AsyncGenerator<T>;
27+
end: () => void;
28+
} {
29+
const queue: T[] = [];
30+
let resolveNext: (() => void) | null = null;
31+
let ended = false;
32+
33+
const push = (msg: T) => {
34+
if (ended) return;
35+
queue.push(msg);
36+
// Signal that new messages are available
37+
if (resolveNext) {
38+
const resolve = resolveNext;
39+
resolveNext = null;
40+
resolve();
41+
}
42+
};
43+
44+
async function* iterate(): AsyncGenerator<T> {
45+
while (!ended) {
46+
// Yield all queued messages synchronously (no async boundaries)
47+
// This ensures all messages from a batch are processed in the same
48+
// event loop tick, preventing premature renders
49+
while (queue.length > 0) {
50+
yield queue.shift()!;
51+
}
52+
// Wait for more messages
53+
await new Promise<void>((resolve) => {
54+
resolveNext = resolve;
55+
});
56+
}
57+
// Yield any remaining messages after end() is called
58+
while (queue.length > 0) {
59+
yield queue.shift()!;
60+
}
61+
}
62+
63+
const end = () => {
64+
ended = true;
65+
// Wake up the iterator so it can exit
66+
if (resolveNext) {
67+
resolveNext();
68+
}
69+
};
70+
71+
return { push, iterate, end };
72+
}

src/node/orpc/router.ts

Lines changed: 4 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import type {
88
FrontendWorkspaceMetadataSchemaType,
99
} from "@/common/orpc/types";
1010
import { createAuthMiddleware } from "./authMiddleware";
11+
import { createAsyncMessageQueue } from "@/common/utils/asyncMessageQueue";
1112

1213
export const router = (authToken?: string) => {
1314
const t = os.$context<ORPCContext>().use(createAuthMiddleware(authToken));
@@ -321,21 +322,7 @@ export const router = (authToken?: string) => {
321322
.output(schemas.workspace.onChat.output)
322323
.handler(async function* ({ context, input }) {
323324
const session = context.workspaceService.getOrCreateSession(input.workspaceId);
324-
325-
let resolveNext: ((value: WorkspaceChatMessage) => void) | null = null;
326-
const queue: WorkspaceChatMessage[] = [];
327-
let ended = false;
328-
329-
const push = (msg: WorkspaceChatMessage) => {
330-
if (ended) return;
331-
if (resolveNext) {
332-
const resolve = resolveNext;
333-
resolveNext = null;
334-
resolve(msg);
335-
} else {
336-
queue.push(msg);
337-
}
338-
};
325+
const { push, iterate, end } = createAsyncMessageQueue<WorkspaceChatMessage>();
339326

340327
// 1. Subscribe to new events (including those triggered by replay)
341328
const unsubscribe = session.onChatEvent(({ message }) => {
@@ -353,18 +340,9 @@ export const router = (authToken?: string) => {
353340
push({ type: "caught-up" });
354341

355342
try {
356-
while (!ended) {
357-
if (queue.length > 0) {
358-
yield queue.shift()!;
359-
} else {
360-
const msg = await new Promise<WorkspaceChatMessage>((resolve) => {
361-
resolveNext = resolve;
362-
});
363-
yield msg;
364-
}
365-
}
343+
yield* iterate();
366344
} finally {
367-
ended = true;
345+
end();
368346
unsubscribe();
369347
}
370348
}),

0 commit comments

Comments
 (0)