From 1ac86b64d64bf656253986a74e5e8884f9aa31cd Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 29 Jan 2026 16:49:53 +0000 Subject: [PATCH 1/3] Type inference for the throttle function --- apps/webapp/app/utils/throttle.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/webapp/app/utils/throttle.ts b/apps/webapp/app/utils/throttle.ts index a6c1a77a32..5b264ebd86 100644 --- a/apps/webapp/app/utils/throttle.ts +++ b/apps/webapp/app/utils/throttle.ts @@ -1,13 +1,13 @@ //From: https://kettanaito.com/blog/debounce-vs-throttle /** A very simple throttle. Will execute the function at the end of each period and discard any other calls during that period. */ -export function throttle( - func: (...args: any[]) => void, +export function throttle( + func: (...args: TArgs) => void, durationMs: number -): (...args: any[]) => void { +): (...args: TArgs) => void { let isPrimedToFire = false; - return (...args: any[]) => { + return (...args: TArgs) => { if (!isPrimedToFire) { isPrimedToFire = true; From 06fa3ffc99958b86c16c6db0cdaf151968fdf995 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 29 Jan 2026 16:50:46 +0000 Subject: [PATCH 2/3] Live reload the run until the run has been finished for 30s - Send refresh pings every 5s - Throttle so we never update more than once per second - Stop auto-reloading when the run has been completed for >=30s --- .../v3/RunStreamPresenter.server.ts | 59 ++++++++++--------- .../route.tsx | 25 +++++++- 2 files changed, 54 insertions(+), 30 deletions(-) diff --git a/apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts b/apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts index 9d54020ad2..ae03479729 100644 --- a/apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts @@ -1,12 +1,12 @@ -import { PrismaClient, prisma } from "~/db.server"; +import { type PrismaClient, prisma } from "~/db.server"; import { logger } from "~/services/logger.server"; import { singleton } from "~/utils/singleton"; -import { createSSELoader } from "~/utils/sse"; +import { createSSELoader, SendFunction } from "~/utils/sse"; import { throttle } from "~/utils/throttle"; import { tracePubSub } from "~/v3/services/tracePubSub.server"; -const PING_INTERVAL = 1000; -const STREAM_TIMEOUT = 30 * 1000; // 30 seconds +const PING_INTERVAL = 5_000; +const STREAM_TIMEOUT = 30_000; export class RunStreamPresenter { #prismaClient: PrismaClient; @@ -49,36 +49,40 @@ export class RunStreamPresenter { // Subscribe to trace updates const { unsubscribe, eventEmitter } = await tracePubSub.subscribeToTrace(run.traceId); - // Store throttled send function and message listener for cleanup - let throttledSend: ReturnType | undefined; + // Only send max every 1 second + const throttledSend = throttle( + (args: { send: SendFunction; event?: string; data: string }) => { + try { + args.send(args); + } catch (error) { + if (error instanceof Error) { + if (error.name !== "TypeError") { + logger.debug("Error sending SSE in RunStreamPresenter", { + error: { + name: error.name, + message: error.message, + stack: error.stack, + }, + }); + } + } + // Abort the stream on send error + context.controller.abort("Send error"); + } + }, + 1000 + ); + let messageListener: ((event: string) => void) | undefined; return { initStream: ({ send }) => { // Create throttled send function - throttledSend = throttle((args: { event?: string; data: string }) => { - try { - send(args); - } catch (error) { - if (error instanceof Error) { - if (error.name !== "TypeError") { - logger.debug("Error sending SSE in RunStreamPresenter", { - error: { - name: error.name, - message: error.message, - stack: error.stack, - }, - }); - } - } - // Abort the stream on send error - context.controller.abort("Send error"); - } - }, 1000); + throttledSend({ send, event: "message", data: new Date().toISOString() }); // Set up message listener for pub/sub events messageListener = (event: string) => { - throttledSend?.({ data: event }); + throttledSend({ send, event: "message", data: event }); }; eventEmitter.addListener("message", messageListener); @@ -88,7 +92,8 @@ export class RunStreamPresenter { iterator: ({ send }) => { // Send ping to keep connection alive try { - send({ event: "ping", data: new Date().toISOString() }); + // Send an actual message so the client refreshes + throttledSend({ send, event: "message", data: new Date().toISOString() }); } catch (error) { // If we can't send a ping, the connection is likely dead return false; diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsx index 899306eb81..1ffd128b30 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam/route.tsx @@ -436,6 +436,24 @@ export default function Page() { ); } +function shouldLiveReload({ + events, + maximumLiveReloadingSetting, + run, +}: { + events: TraceEvent[]; + maximumLiveReloadingSetting: number; + run: { completedAt: string | null }; +}): boolean { + // We don't live reload if there are a ton of spans/logs + if (events.length > maximumLiveReloadingSetting) return false; + + // If the run was completed a while ago, we don't need to live reload anymore + if (run.completedAt && new Date(run.completedAt).getTime() < Date.now() - 30_000) return false; + + return true; +} + function TraceView({ run, trace, @@ -453,18 +471,19 @@ function TraceView({ const { events, duration, rootSpanStatus, rootStartedAt, queuedDuration, overridesBySpanId } = trace; - const shouldLiveReload = events.length <= maximumLiveReloadingSetting; const changeToSpan = useDebounce((selectedSpan: string) => { replaceSearchParam("span", selectedSpan, { replace: true }); }, 250); + const isLiveReloading = shouldLiveReload({ events, maximumLiveReloadingSetting, run }); + const revalidator = useRevalidator(); const streamedEvents = useEventSource( v3RunStreamingPath(organization, project, environment, run), { event: "message", - disabled: !shouldLiveReload, + disabled: !isLiveReloading, } ); useEffect(() => { @@ -511,7 +530,7 @@ function TraceView({ rootStartedAt={rootStartedAt ? new Date(rootStartedAt) : undefined} queuedDuration={queuedDuration} environmentType={run.environment.type} - shouldLiveReload={shouldLiveReload} + shouldLiveReload={isLiveReloading} maximumLiveReloadingSetting={maximumLiveReloadingSetting} rootRun={run.rootTaskRun} parentRun={run.parentTaskRun} From 16623b8672f0003cc3dd0fbd3a01bc97e112928e Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 29 Jan 2026 21:54:21 +0000 Subject: [PATCH 3/3] Only send the required args --- apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts b/apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts index ae03479729..1dd4edc623 100644 --- a/apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/RunStreamPresenter.server.ts @@ -53,7 +53,7 @@ export class RunStreamPresenter { const throttledSend = throttle( (args: { send: SendFunction; event?: string; data: string }) => { try { - args.send(args); + args.send({ event: args.event, data: args.data }); } catch (error) { if (error instanceof Error) { if (error.name !== "TypeError") {