Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -533,8 +533,10 @@ const EnvironmentSchema = z
BATCH_TASK_PAYLOAD_MAXIMUM_SIZE: z.coerce.number().int().default(1_000_000), // 1MB
TASK_RUN_METADATA_MAXIMUM_SIZE: z.coerce.number().int().default(262_144), // 256KB

MAXIMUM_DEV_QUEUE_SIZE: z.coerce.number().int().optional(),
MAXIMUM_DEV_QUEUE_SIZE: z.coerce.number().int().optional().default(500),
MAXIMUM_DEPLOYED_QUEUE_SIZE: z.coerce.number().int().optional(),
QUEUE_SIZE_CACHE_TTL_MS: z.coerce.number().int().optional().default(30_000), // 30 seconds
QUEUE_SIZE_CACHE_MAX_SIZE: z.coerce.number().int().optional().default(5_000),
Comment on lines +536 to +539
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Defaulting MAXIMUM_DEV_QUEUE_SIZE changes enforcement behavior.

This turns previously-unlimited dev environments into a hard 500-queue cap (via guardQueueSizeLimitsForEnv). If that’s not intentional, remove the default and require an explicit env var to enable the limit.

💡 Suggested change (avoid unintended hard limit)
-    MAXIMUM_DEV_QUEUE_SIZE: z.coerce.number().int().optional().default(500),
+    MAXIMUM_DEV_QUEUE_SIZE: z.coerce.number().int().optional(),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
MAXIMUM_DEV_QUEUE_SIZE: z.coerce.number().int().optional().default(500),
MAXIMUM_DEPLOYED_QUEUE_SIZE: z.coerce.number().int().optional(),
QUEUE_SIZE_CACHE_TTL_MS: z.coerce.number().int().optional().default(30_000), // 30 seconds
QUEUE_SIZE_CACHE_MAX_SIZE: z.coerce.number().int().optional().default(5_000),
MAXIMUM_DEV_QUEUE_SIZE: z.coerce.number().int().optional(),
MAXIMUM_DEPLOYED_QUEUE_SIZE: z.coerce.number().int().optional(),
QUEUE_SIZE_CACHE_TTL_MS: z.coerce.number().int().optional().default(30_000), // 30 seconds
QUEUE_SIZE_CACHE_MAX_SIZE: z.coerce.number().int().optional().default(5_000),
🤖 Prompt for AI Agents
In `@apps/webapp/app/env.server.ts` around lines 536 - 539, The
MAXIMUM_DEV_QUEUE_SIZE default of 500 introduces an unintended hard cap for dev
environments because guardQueueSizeLimitsForEnv reads this value and enforces
it; remove the .default(500) so MAXIMUM_DEV_QUEUE_SIZE remains
optional/undefined unless explicitly set in the environment, leaving the
z.coerce.number().int().optional() schema for MAXIMUM_DEV_QUEUE_SIZE and ensure
any code calling guardQueueSizeLimitsForEnv continues to treat undefined as "no
cap" (verify guardQueueSizeLimitsForEnv behavior and update it only if it
currently treats undefined incorrectly).

MAX_BATCH_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500),
MAX_BATCH_AND_WAIT_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500),

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { type AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { marqs } from "~/v3/marqs/index.server";
import { engine } from "~/v3/runEngine.server";
import { getQueueSizeLimit } from "~/v3/utils/queueLimits.server";
import { BasePresenter } from "./basePresenter.server";

export type Environment = {
Expand All @@ -9,6 +10,7 @@ export type Environment = {
concurrencyLimit: number;
burstFactor: number;
runsEnabled: boolean;
queueSizeLimit: number | null;
};

export class EnvironmentQueuePresenter extends BasePresenter {
Expand All @@ -30,19 +32,24 @@ export class EnvironmentQueuePresenter extends BasePresenter {
},
select: {
runsEnabled: true,
maximumDevQueueSize: true,
maximumDeployedQueueSize: true,
},
});

if (!organization) {
throw new Error("Organization not found");
}

const queueSizeLimit = getQueueSizeLimit(environment.type, organization);

return {
running,
queued,
concurrencyLimit: environment.maximumConcurrencyLimit,
burstFactor: environment.concurrencyLimitBurstFactor.toNumber(),
runsEnabled: environment.type === "DEVELOPMENT" || organization.runsEnabled,
queueSizeLimit,
};
}
}
51 changes: 36 additions & 15 deletions apps/webapp/app/presenters/v3/LimitsPresenter.server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Ratelimit } from "@upstash/ratelimit";
import { RuntimeEnvironmentType } from "@trigger.dev/database";
import { createHash } from "node:crypto";
import { env } from "~/env.server";
import { getCurrentPlan } from "~/services/platform.v3.server";
Expand All @@ -12,6 +13,8 @@ import { BasePresenter } from "./basePresenter.server";
import { singleton } from "~/utils/singleton";
import { logger } from "~/services/logger.server";
import { CheckScheduleService } from "~/v3/services/checkSchedule.server";
import { engine } from "~/v3/runEngine.server";
import { getQueueSizeLimit, getQueueSizeLimitSource } from "~/v3/utils/queueLimits.server";

// Create a singleton Redis client for rate limit queries
const rateLimitRedisClient = singleton("rateLimitQueryRedisClient", () =>
Expand Down Expand Up @@ -66,8 +69,7 @@ export type LimitsResult = {
logRetentionDays: QuotaInfo | null;
realtimeConnections: QuotaInfo | null;
batchProcessingConcurrency: QuotaInfo;
devQueueSize: QuotaInfo;
deployedQueueSize: QuotaInfo;
queueSize: QuotaInfo;
};
features: {
hasStagingEnvironment: FeatureInfo;
Expand All @@ -84,11 +86,13 @@ export class LimitsPresenter extends BasePresenter {
organizationId,
projectId,
environmentId,
environmentType,
environmentApiKey,
}: {
organizationId: string;
projectId: string;
environmentId: string;
environmentType: RuntimeEnvironmentType;
environmentApiKey: string;
}): Promise<LimitsResult> {
// Get organization with all limit-related fields
Expand Down Expand Up @@ -167,6 +171,30 @@ export class LimitsPresenter extends BasePresenter {
batchRateLimitConfig
);

// Get current queue size for this environment
// We need the runtime environment fields for the engine query
const runtimeEnv = await this._replica.runtimeEnvironment.findFirst({
where: { id: environmentId },
select: {
id: true,
maximumConcurrencyLimit: true,
concurrencyLimitBurstFactor: true,
},
});

let currentQueueSize = 0;
if (runtimeEnv) {
const engineEnv = {
id: runtimeEnv.id,
type: environmentType,
maximumConcurrencyLimit: runtimeEnv.maximumConcurrencyLimit,
concurrencyLimitBurstFactor: runtimeEnv.concurrencyLimitBurstFactor,
organization: { id: organizationId },
project: { id: projectId },
};
currentQueueSize = (await engine.lengthOfEnvQueue(engineEnv)) ?? 0;
}

// Get plan-level limits
const schedulesLimit = limits?.schedules?.number ?? null;
const teamMembersLimit = limits?.teamMembers?.number ?? null;
Expand Down Expand Up @@ -282,19 +310,12 @@ export class LimitsPresenter extends BasePresenter {
canExceed: true,
isUpgradable: true,
},
devQueueSize: {
name: "Dev queue size",
description: "Maximum pending runs in development environments",
limit: organization.maximumDevQueueSize ?? null,
currentUsage: 0, // Would need to query Redis for this
source: organization.maximumDevQueueSize ? "override" : "default",
},
deployedQueueSize: {
name: "Deployed queue size",
description: "Maximum pending runs in deployed environments",
limit: organization.maximumDeployedQueueSize ?? null,
currentUsage: 0, // Would need to query Redis for this
source: organization.maximumDeployedQueueSize ? "override" : "default",
queueSize: {
name: "Max queued runs",
description: "Maximum pending runs across all queues in this environment",
limit: getQueueSizeLimit(environmentType, organization),
currentUsage: currentQueueSize,
source: getQueueSizeLimitSource(environmentType, organization),
},
Comment on lines +313 to 319
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Queue-size quota never shows an upgrade action.

isUpgradable is omitted, so the Upgrade column is empty even for plan-based limits. If upgrades should be offered, set it to true (and optionally canExceed).

💡 Suggested change
         queueSize: {
           name: "Max queued runs",
           description: "Maximum pending runs across all queues in this environment",
           limit: getQueueSizeLimit(environmentType, organization),
           currentUsage: currentQueueSize,
           source: getQueueSizeLimitSource(environmentType, organization),
+          isUpgradable: true,
         },
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
queueSize: {
name: "Max queued runs",
description: "Maximum pending runs across all queues in this environment",
limit: getQueueSizeLimit(environmentType, organization),
currentUsage: currentQueueSize,
source: getQueueSizeLimitSource(environmentType, organization),
},
queueSize: {
name: "Max queued runs",
description: "Maximum pending runs across all queues in this environment",
limit: getQueueSizeLimit(environmentType, organization),
currentUsage: currentQueueSize,
source: getQueueSizeLimitSource(environmentType, organization),
isUpgradable: true,
},
🤖 Prompt for AI Agents
In `@apps/webapp/app/presenters/v3/LimitsPresenter.server.ts` around lines 313 -
319, The queue-size quota object (queueSize) in LimitsPresenter.server.ts is
missing the isUpgradable flag so the UI never shows an Upgrade action; update
the queueSize payload returned by the presenter to include isUpgradable: true
for plan-based limits (and add canExceed: true|false as appropriate), e.g., set
isUpgradable to true when getQueueSizeLimitSource(environmentType, organization)
indicates a plan-based source and ensure the UI-facing fields
(queueSize.currentUsage, queueSize.limit, queueSize.source) remain unchanged.

},
features: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
organizationId: project.organizationId,
projectId: project.id,
environmentId: environment.id,
environmentType: environment.type,
environmentApiKey: environment.apiKey,
})
);
Expand Down Expand Up @@ -507,9 +508,8 @@ function QuotasSection({
// Include batch processing concurrency
quotaRows.push(quotas.batchProcessingConcurrency);

// Add queue size quotas if set
if (quotas.devQueueSize.limit !== null) quotaRows.push(quotas.devQueueSize);
if (quotas.deployedQueueSize.limit !== null) quotaRows.push(quotas.deployedQueueSize);
// Add queue size quota if set
if (quotas.queueSize.limit !== null) quotaRows.push(quotas.queueSize);

return (
<div className="flex flex-col gap-3">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import { EnvironmentQueuePresenter } from "~/presenters/v3/EnvironmentQueuePrese
import { QueueListPresenter } from "~/presenters/v3/QueueListPresenter.server";
import { requireUserId } from "~/services/session.server";
import { cn } from "~/utils/cn";
import { formatNumberCompact } from "~/utils/numberFormatter";
import {
concurrencyPath,
docsPath,
Expand Down Expand Up @@ -345,7 +346,13 @@ export default function Page() {
<BigNumber
title="Queued"
value={environment.queued}
suffix={env.paused && environment.queued > 0 ? "paused" : undefined}
suffix={
<QueuedSuffix
queued={environment.queued}
queueSizeLimit={environment.queueSizeLimit}
isPaused={env.paused}
/>
}
animate
accessory={
<div className="flex items-start gap-1">
Expand All @@ -364,7 +371,10 @@ export default function Page() {
/>
</div>
}
valueClassName={env.paused ? "text-warning" : undefined}
valueClassName={
getQueueUsageColorClass(environment.queued, environment.queueSizeLimit) ??
(env.paused ? "text-warning" : undefined)
}
compactThreshold={1000000}
/>
<BigNumber
Expand Down Expand Up @@ -1118,3 +1128,53 @@ function BurstFactorTooltip({
/>
);
}

function getQueueUsageColorClass(current: number, limit: number | null): string | undefined {
if (!limit) return undefined;
const percentage = current / limit;
if (percentage >= 1) return "text-error";
if (percentage >= 0.9) return "text-warning";
return undefined;
}

/**
* Renders the suffix for the Queued BigNumber, showing:
* - The limit with usage color and tooltip (if queueSizeLimit is set)
* - "paused" text (if environment is paused)
* - Both indicators when applicable
*/
function QueuedSuffix({
queued,
queueSizeLimit,
isPaused,
}: {
queued: number;
queueSizeLimit: number | null;
isPaused: boolean;
}) {
const showLimit = queueSizeLimit !== null;

if (!showLimit && !isPaused) {
return null;
}

return (
<span className="flex items-center gap-1">
{showLimit && (
<>
<span className="text-text-dimmed">/</span>
<span className={getQueueUsageColorClass(queued, queueSizeLimit)}>
{formatNumberCompact(queueSizeLimit)}
</span>
<InfoIconTooltip
content="Maximum pending runs across all queues in this environment"
contentClassName="max-w-xs"
/>
</>
)}
{isPaused && (
<span className="text-warning">{showLimit ? "(paused)" : "paused"}</span>
)}
</span>
);
}
40 changes: 32 additions & 8 deletions apps/webapp/app/runEngine/concerns/queues.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,22 @@ import type { RunEngine } from "~/v3/runEngine.server";
import { env } from "~/env.server";
import { tryCatch } from "@trigger.dev/core/v3";
import { ServiceValidationError } from "~/v3/services/common.server";
import { createCache, createLRUMemoryStore, DefaultStatefulContext, Namespace } from "@internal/cache";
import { singleton } from "~/utils/singleton";

// LRU cache for environment queue sizes to reduce Redis calls
const queueSizeCache = singleton("queueSizeCache", () => {
const ctx = new DefaultStatefulContext();
const memory = createLRUMemoryStore(env.QUEUE_SIZE_CACHE_MAX_SIZE, "queue-size-cache");

return createCache({
queueSize: new Namespace<number>(ctx, {
stores: [memory],
fresh: env.QUEUE_SIZE_CACHE_TTL_MS,
stale: env.QUEUE_SIZE_CACHE_TTL_MS + 1000,
}),
});
});

/**
* Extract the queue name from a queue option that may be:
Expand Down Expand Up @@ -49,7 +65,7 @@ export class DefaultQueueManager implements QueueManager {
constructor(
private readonly prisma: PrismaClientOrTransaction,
private readonly engine: RunEngine
) {}
) { }

async resolveQueueProperties(
request: TriggerTaskRequest,
Expand All @@ -75,8 +91,7 @@ export class DefaultQueueManager implements QueueManager {

if (!specifiedQueue) {
throw new ServiceValidationError(
`Specified queue '${specifiedQueueName}' not found or not associated with locked version '${
lockedBackgroundWorker.version ?? "<unknown>"
`Specified queue '${specifiedQueueName}' not found or not associated with locked version '${lockedBackgroundWorker.version ?? "<unknown>"
}'.`
);
}
Expand All @@ -98,8 +113,7 @@ export class DefaultQueueManager implements QueueManager {

if (!lockedTask) {
throw new ServiceValidationError(
`Task '${request.taskId}' not found on locked version '${
lockedBackgroundWorker.version ?? "<unknown>"
`Task '${request.taskId}' not found on locked version '${lockedBackgroundWorker.version ?? "<unknown>"
}'.`
);
}
Expand All @@ -113,8 +127,7 @@ export class DefaultQueueManager implements QueueManager {
version: lockedBackgroundWorker.version,
});
throw new ServiceValidationError(
`Default queue configuration for task '${request.taskId}' missing on locked version '${
lockedBackgroundWorker.version ?? "<unknown>"
`Default queue configuration for task '${request.taskId}' missing on locked version '${lockedBackgroundWorker.version ?? "<unknown>"
}'.`
);
}
Expand Down Expand Up @@ -282,7 +295,7 @@ async function guardQueueSizeLimitsForEnv(
return { isWithinLimits: true };
}

const queueSize = await engine.lengthOfEnvQueue(environment);
const queueSize = await getCachedQueueSize(engine, environment);
const projectedSize = queueSize + itemsToAdd;

return {
Expand All @@ -291,3 +304,14 @@ async function guardQueueSizeLimitsForEnv(
queueSize,
};
}

async function getCachedQueueSize(
engine: RunEngine,
environment: AuthenticatedEnvironment
): Promise<number> {
const result = await queueSizeCache.queueSize.swr(environment.id, async () => {
return engine.lengthOfEnvQueue(environment);
});

return result.val ?? 0;
}
51 changes: 51 additions & 0 deletions apps/webapp/app/v3/utils/queueLimits.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { RuntimeEnvironmentType } from "@trigger.dev/database";
import { env } from "~/env.server";

/**
* Organization fields needed for queue limit calculation.
*/
export type QueueLimitOrganization = {
maximumDevQueueSize: number | null;
maximumDeployedQueueSize: number | null;
};

/**
* Calculates the queue size limit for an environment based on its type and organization settings.
*
* Resolution order:
* 1. Organization-level override (set by billing sync or admin)
* 2. Environment variable fallback
* 3. null if neither is set
*
* @param environmentType - The type of the runtime environment
* @param organization - Organization with queue limit fields
* @returns The queue size limit, or null if unlimited
*/
export function getQueueSizeLimit(
environmentType: RuntimeEnvironmentType,
organization: QueueLimitOrganization
): number | null {
if (environmentType === "DEVELOPMENT") {
return organization.maximumDevQueueSize ?? env.MAXIMUM_DEV_QUEUE_SIZE ?? null;
}

return organization.maximumDeployedQueueSize ?? env.MAXIMUM_DEPLOYED_QUEUE_SIZE ?? null;
}

/**
* Determines the source of the queue size limit for display purposes.
*
* @param environmentType - The type of the runtime environment
* @param organization - Organization with queue limit fields
* @returns "plan" if org has a value (typically set by billing), "default" if using env var fallback
*/
export function getQueueSizeLimitSource(
environmentType: RuntimeEnvironmentType,
organization: QueueLimitOrganization
): "plan" | "default" {
if (environmentType === "DEVELOPMENT") {
return organization.maximumDevQueueSize !== null ? "plan" : "default";
}

return organization.maximumDeployedQueueSize !== null ? "plan" : "default";
}