Skip to content

Commit a50a5f5

Browse files
committed
A couple of devin improvements and adding an in memory cache for the env queue size check
1 parent a0f94ff commit a50a5f5

File tree

7 files changed

+146
-53
lines changed

7 files changed

+146
-53
lines changed

apps/webapp/app/env.server.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -533,8 +533,10 @@ const EnvironmentSchema = z
533533
BATCH_TASK_PAYLOAD_MAXIMUM_SIZE: z.coerce.number().int().default(1_000_000), // 1MB
534534
TASK_RUN_METADATA_MAXIMUM_SIZE: z.coerce.number().int().default(262_144), // 256KB
535535

536-
MAXIMUM_DEV_QUEUE_SIZE: z.coerce.number().int().optional(),
536+
MAXIMUM_DEV_QUEUE_SIZE: z.coerce.number().int().optional().default(500),
537537
MAXIMUM_DEPLOYED_QUEUE_SIZE: z.coerce.number().int().optional(),
538+
QUEUE_SIZE_CACHE_TTL_MS: z.coerce.number().int().optional().default(30_000), // 30 seconds
539+
QUEUE_SIZE_CACHE_MAX_SIZE: z.coerce.number().int().optional().default(5_000),
538540
MAX_BATCH_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500),
539541
MAX_BATCH_AND_WAIT_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500),
540542

apps/webapp/app/presenters/v3/EnvironmentQueuePresenter.server.ts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
import { env } from "~/env.server";
21
import { type AuthenticatedEnvironment } from "~/services/apiAuth.server";
32
import { marqs } from "~/v3/marqs/index.server";
43
import { engine } from "~/v3/runEngine.server";
4+
import { getQueueSizeLimit } from "~/v3/utils/queueLimits.server";
55
import { BasePresenter } from "./basePresenter.server";
66

77
export type Environment = {
@@ -41,10 +41,7 @@ export class EnvironmentQueuePresenter extends BasePresenter {
4141
throw new Error("Organization not found");
4242
}
4343

44-
const queueSizeLimit =
45-
environment.type === "DEVELOPMENT"
46-
? (organization.maximumDevQueueSize ?? env.MAXIMUM_DEV_QUEUE_SIZE ?? null)
47-
: (organization.maximumDeployedQueueSize ?? env.MAXIMUM_DEPLOYED_QUEUE_SIZE ?? null);
44+
const queueSizeLimit = getQueueSizeLimit(environment.type, organization);
4845

4946
return {
5047
running,

apps/webapp/app/presenters/v3/LimitsPresenter.server.ts

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { Ratelimit } from "@upstash/ratelimit";
2+
import { RuntimeEnvironmentType } from "@trigger.dev/database";
23
import { createHash } from "node:crypto";
34
import { env } from "~/env.server";
45
import { getCurrentPlan } from "~/services/platform.v3.server";
@@ -13,6 +14,7 @@ import { singleton } from "~/utils/singleton";
1314
import { logger } from "~/services/logger.server";
1415
import { CheckScheduleService } from "~/v3/services/checkSchedule.server";
1516
import { engine } from "~/v3/runEngine.server";
17+
import { getQueueSizeLimit, getQueueSizeLimitSource } from "~/v3/utils/queueLimits.server";
1618

1719
// Create a singleton Redis client for rate limit queries
1820
const rateLimitRedisClient = singleton("rateLimitQueryRedisClient", () =>
@@ -84,11 +86,13 @@ export class LimitsPresenter extends BasePresenter {
8486
organizationId,
8587
projectId,
8688
environmentId,
89+
environmentType,
8790
environmentApiKey,
8891
}: {
8992
organizationId: string;
9093
projectId: string;
9194
environmentId: string;
95+
environmentType: RuntimeEnvironmentType;
9296
environmentApiKey: string;
9397
}): Promise<LimitsResult> {
9498
// Get organization with all limit-related fields
@@ -168,13 +172,11 @@ export class LimitsPresenter extends BasePresenter {
168172
);
169173

170174
// Get current queue size for this environment
175+
// We need the runtime environment fields for the engine query
171176
const runtimeEnv = await this._replica.runtimeEnvironment.findFirst({
172177
where: { id: environmentId },
173178
select: {
174179
id: true,
175-
type: true,
176-
organizationId: true,
177-
projectId: true,
178180
maximumConcurrencyLimit: true,
179181
concurrencyLimitBurstFactor: true,
180182
},
@@ -184,11 +186,11 @@ export class LimitsPresenter extends BasePresenter {
184186
if (runtimeEnv) {
185187
const engineEnv = {
186188
id: runtimeEnv.id,
187-
type: runtimeEnv.type,
189+
type: environmentType,
188190
maximumConcurrencyLimit: runtimeEnv.maximumConcurrencyLimit,
189191
concurrencyLimitBurstFactor: runtimeEnv.concurrencyLimitBurstFactor,
190-
organization: { id: runtimeEnv.organizationId },
191-
project: { id: runtimeEnv.projectId },
192+
organization: { id: organizationId },
193+
project: { id: projectId },
192194
};
193195
currentQueueSize = (await engine.lengthOfEnvQueue(engineEnv)) ?? 0;
194196
}
@@ -311,21 +313,9 @@ export class LimitsPresenter extends BasePresenter {
311313
queueSize: {
312314
name: "Max queued runs",
313315
description: "Maximum pending runs across all queues in this environment",
314-
limit:
315-
runtimeEnv?.type === "DEVELOPMENT"
316-
? (organization.maximumDevQueueSize ?? env.MAXIMUM_DEV_QUEUE_SIZE ?? null)
317-
: (organization.maximumDeployedQueueSize ?? env.MAXIMUM_DEPLOYED_QUEUE_SIZE ?? null),
316+
limit: getQueueSizeLimit(environmentType, organization),
318317
currentUsage: currentQueueSize,
319-
// "plan" = org has a value (typically set by billing sync)
320-
// "default" = no org value, using env var fallback
321-
source:
322-
runtimeEnv?.type === "DEVELOPMENT"
323-
? organization.maximumDevQueueSize
324-
? "plan"
325-
: "default"
326-
: organization.maximumDeployedQueueSize
327-
? "plan"
328-
: "default",
318+
source: getQueueSizeLimitSource(environmentType, organization),
329319
},
330320
},
331321
features: {

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.limits/route.tsx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
8282
organizationId: project.organizationId,
8383
projectId: project.id,
8484
environmentId: environment.id,
85+
environmentType: environment.type,
8586
environmentApiKey: environment.apiKey,
8687
})
8788
);

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx

Lines changed: 47 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -347,25 +347,11 @@ export default function Page() {
347347
title="Queued"
348348
value={environment.queued}
349349
suffix={
350-
environment.queueSizeLimit ? (
351-
<span className="flex items-center gap-1">
352-
<span className="text-text-dimmed">/</span>
353-
<span
354-
className={getQueueUsageColorClass(
355-
environment.queued,
356-
environment.queueSizeLimit
357-
)}
358-
>
359-
{formatNumberCompact(environment.queueSizeLimit)}
360-
</span>
361-
<InfoIconTooltip
362-
content="Maximum pending runs across all queues in this environment"
363-
contentClassName="max-w-xs"
364-
/>
365-
</span>
366-
) : env.paused && environment.queued > 0 ? (
367-
"paused"
368-
) : undefined
350+
<QueuedSuffix
351+
queued={environment.queued}
352+
queueSizeLimit={environment.queueSizeLimit}
353+
isPaused={env.paused}
354+
/>
369355
}
370356
animate
371357
accessory={
@@ -1150,3 +1136,45 @@ function getQueueUsageColorClass(current: number, limit: number | null): string
11501136
if (percentage >= 0.9) return "text-warning";
11511137
return undefined;
11521138
}
1139+
1140+
/**
1141+
* Renders the suffix for the Queued BigNumber, showing:
1142+
* - The limit with usage color and tooltip (if queueSizeLimit is set)
1143+
* - "paused" text (if environment is paused)
1144+
* - Both indicators when applicable
1145+
*/
1146+
function QueuedSuffix({
1147+
queued,
1148+
queueSizeLimit,
1149+
isPaused,
1150+
}: {
1151+
queued: number;
1152+
queueSizeLimit: number | null;
1153+
isPaused: boolean;
1154+
}) {
1155+
const showLimit = queueSizeLimit !== null;
1156+
1157+
if (!showLimit && !isPaused) {
1158+
return null;
1159+
}
1160+
1161+
return (
1162+
<span className="flex items-center gap-1">
1163+
{showLimit && (
1164+
<>
1165+
<span className="text-text-dimmed">/</span>
1166+
<span className={getQueueUsageColorClass(queued, queueSizeLimit)}>
1167+
{formatNumberCompact(queueSizeLimit)}
1168+
</span>
1169+
<InfoIconTooltip
1170+
content="Maximum pending runs across all queues in this environment"
1171+
contentClassName="max-w-xs"
1172+
/>
1173+
</>
1174+
)}
1175+
{isPaused && (
1176+
<span className="text-warning">{showLimit ? "(paused)" : "paused"}</span>
1177+
)}
1178+
</span>
1179+
);
1180+
}

apps/webapp/app/runEngine/concerns/queues.server.ts

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,22 @@ import type { RunEngine } from "~/v3/runEngine.server";
1515
import { env } from "~/env.server";
1616
import { tryCatch } from "@trigger.dev/core/v3";
1717
import { ServiceValidationError } from "~/v3/services/common.server";
18+
import { createCache, createLRUMemoryStore, DefaultStatefulContext, Namespace } from "@internal/cache";
19+
import { singleton } from "~/utils/singleton";
20+
21+
// LRU cache for environment queue sizes to reduce Redis calls
22+
const queueSizeCache = singleton("queueSizeCache", () => {
23+
const ctx = new DefaultStatefulContext();
24+
const memory = createLRUMemoryStore(env.QUEUE_SIZE_CACHE_MAX_SIZE, "queue-size-cache");
25+
26+
return createCache({
27+
queueSize: new Namespace<number>(ctx, {
28+
stores: [memory],
29+
fresh: env.QUEUE_SIZE_CACHE_TTL_MS,
30+
stale: env.QUEUE_SIZE_CACHE_TTL_MS + 1000,
31+
}),
32+
});
33+
});
1834

1935
/**
2036
* Extract the queue name from a queue option that may be:
@@ -49,7 +65,7 @@ export class DefaultQueueManager implements QueueManager {
4965
constructor(
5066
private readonly prisma: PrismaClientOrTransaction,
5167
private readonly engine: RunEngine
52-
) {}
68+
) { }
5369

5470
async resolveQueueProperties(
5571
request: TriggerTaskRequest,
@@ -75,8 +91,7 @@ export class DefaultQueueManager implements QueueManager {
7591

7692
if (!specifiedQueue) {
7793
throw new ServiceValidationError(
78-
`Specified queue '${specifiedQueueName}' not found or not associated with locked version '${
79-
lockedBackgroundWorker.version ?? "<unknown>"
94+
`Specified queue '${specifiedQueueName}' not found or not associated with locked version '${lockedBackgroundWorker.version ?? "<unknown>"
8095
}'.`
8196
);
8297
}
@@ -98,8 +113,7 @@ export class DefaultQueueManager implements QueueManager {
98113

99114
if (!lockedTask) {
100115
throw new ServiceValidationError(
101-
`Task '${request.taskId}' not found on locked version '${
102-
lockedBackgroundWorker.version ?? "<unknown>"
116+
`Task '${request.taskId}' not found on locked version '${lockedBackgroundWorker.version ?? "<unknown>"
103117
}'.`
104118
);
105119
}
@@ -113,8 +127,7 @@ export class DefaultQueueManager implements QueueManager {
113127
version: lockedBackgroundWorker.version,
114128
});
115129
throw new ServiceValidationError(
116-
`Default queue configuration for task '${request.taskId}' missing on locked version '${
117-
lockedBackgroundWorker.version ?? "<unknown>"
130+
`Default queue configuration for task '${request.taskId}' missing on locked version '${lockedBackgroundWorker.version ?? "<unknown>"
118131
}'.`
119132
);
120133
}
@@ -282,7 +295,7 @@ async function guardQueueSizeLimitsForEnv(
282295
return { isWithinLimits: true };
283296
}
284297

285-
const queueSize = await engine.lengthOfEnvQueue(environment);
298+
const queueSize = await getCachedQueueSize(engine, environment);
286299
const projectedSize = queueSize + itemsToAdd;
287300

288301
return {
@@ -291,3 +304,14 @@ async function guardQueueSizeLimitsForEnv(
291304
queueSize,
292305
};
293306
}
307+
308+
async function getCachedQueueSize(
309+
engine: RunEngine,
310+
environment: AuthenticatedEnvironment
311+
): Promise<number> {
312+
const result = await queueSizeCache.queueSize.swr(environment.id, async () => {
313+
return engine.lengthOfEnvQueue(environment);
314+
});
315+
316+
return result.val ?? 0;
317+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import { RuntimeEnvironmentType } from "@trigger.dev/database";
2+
import { env } from "~/env.server";
3+
4+
/**
5+
* Organization fields needed for queue limit calculation.
6+
*/
7+
export type QueueLimitOrganization = {
8+
maximumDevQueueSize: number | null;
9+
maximumDeployedQueueSize: number | null;
10+
};
11+
12+
/**
13+
* Calculates the queue size limit for an environment based on its type and organization settings.
14+
*
15+
* Resolution order:
16+
* 1. Organization-level override (set by billing sync or admin)
17+
* 2. Environment variable fallback
18+
* 3. null if neither is set
19+
*
20+
* @param environmentType - The type of the runtime environment
21+
* @param organization - Organization with queue limit fields
22+
* @returns The queue size limit, or null if unlimited
23+
*/
24+
export function getQueueSizeLimit(
25+
environmentType: RuntimeEnvironmentType,
26+
organization: QueueLimitOrganization
27+
): number | null {
28+
if (environmentType === "DEVELOPMENT") {
29+
return organization.maximumDevQueueSize ?? env.MAXIMUM_DEV_QUEUE_SIZE ?? null;
30+
}
31+
32+
return organization.maximumDeployedQueueSize ?? env.MAXIMUM_DEPLOYED_QUEUE_SIZE ?? null;
33+
}
34+
35+
/**
36+
* Determines the source of the queue size limit for display purposes.
37+
*
38+
* @param environmentType - The type of the runtime environment
39+
* @param organization - Organization with queue limit fields
40+
* @returns "plan" if org has a value (typically set by billing), "default" if using env var fallback
41+
*/
42+
export function getQueueSizeLimitSource(
43+
environmentType: RuntimeEnvironmentType,
44+
organization: QueueLimitOrganization
45+
): "plan" | "default" {
46+
if (environmentType === "DEVELOPMENT") {
47+
return organization.maximumDevQueueSize !== null ? "plan" : "default";
48+
}
49+
50+
return organization.maximumDeployedQueueSize !== null ? "plan" : "default";
51+
}

0 commit comments

Comments
 (0)