From eec253687e133ad44928016f8a84623a1141cabc Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Mon, 13 Oct 2025 12:16:15 +0100 Subject: [PATCH 01/13] feat(queues): add ability to override concurrency limit via API and dashboard --- .../v3/QueueListPresenter.server.ts | 19 ++ .../v3/QueueRetrievePresenter.server.ts | 79 +++++- .../route.tsx | 259 +++++++++++++++++- ...queues.$queueParam.concurrency.override.ts | 75 +++++ ...v1.queues.$queueParam.concurrency.reset.ts | 75 +++++ .../v3/services/concurrencySystem.server.ts | 209 ++++++++++++++ .../concurrencySystemInstance.server.ts | 15 + .../app/v3/services/pauseQueue.server.ts | 3 + .../migration.sql | 2 + .../migration.sql | 3 + .../migration.sql | 2 + .../database/prisma/schema.prisma | 11 +- packages/core/src/v3/apiClient/index.ts | 47 ++++ packages/core/src/v3/schemas/queues.ts | 19 +- packages/trigger-sdk/src/v3/queues.ts | 74 +++++ references/hello-world/src/trigger/queues.ts | 71 ++++- 16 files changed, 935 insertions(+), 28 deletions(-) create mode 100644 apps/webapp/app/routes/api.v1.queues.$queueParam.concurrency.override.ts create mode 100644 apps/webapp/app/routes/api.v1.queues.$queueParam.concurrency.reset.ts create mode 100644 apps/webapp/app/v3/services/concurrencySystem.server.ts create mode 100644 apps/webapp/app/v3/services/concurrencySystemInstance.server.ts create mode 100644 internal-packages/database/prisma/migrations/20251009140053_add_task_queue_concurrency_limit_base_column/migration.sql create mode 100644 internal-packages/database/prisma/migrations/20251010102348_add_concurrency_limit_overriden_at_to_task_queue/migration.sql create mode 100644 internal-packages/database/prisma/migrations/20251010114444_add_concurrency_limit_overridden_by_to_task_queue/migration.sql diff --git a/apps/webapp/app/presenters/v3/QueueListPresenter.server.ts b/apps/webapp/app/presenters/v3/QueueListPresenter.server.ts index ec60db8b92..0fe9e3f365 100644 --- a/apps/webapp/app/presenters/v3/QueueListPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/QueueListPresenter.server.ts @@ -114,6 +114,9 @@ export class QueueListPresenter extends BasePresenter { name: true, orderableName: true, concurrencyLimit: true, + concurrencyLimitBase: true, + concurrencyLimitOverriddenAt: true, + concurrencyLimitOverriddenBy: true, type: true, paused: true, }, @@ -135,6 +138,17 @@ export class QueueListPresenter extends BasePresenter { ), ]); + // Manually "join" the overridden users because there is no way to implement the relationship + // in prisma without adding a foreign key constraint + const overriddenByIds = queues.map((q) => q.concurrencyLimitOverriddenBy).filter(Boolean); + const overriddenByUsers = await this._replica.user.findMany({ + where: { + id: { in: overriddenByIds }, + }, + }); + + const overriddenByMap = new Map(overriddenByUsers.map((u) => [u.id, u])); + // Transform queues to include running and queued counts return queues.map((queue) => toQueueItem({ @@ -144,6 +158,11 @@ export class QueueListPresenter extends BasePresenter { running: results[1][queue.name] ?? 0, queued: results[0][queue.name] ?? 0, concurrencyLimit: queue.concurrencyLimit ?? null, + concurrencyLimitBase: queue.concurrencyLimitBase ?? null, + concurrencyLimitOverriddenAt: queue.concurrencyLimitOverriddenAt ?? null, + concurrencyLimitOverriddenBy: queue.concurrencyLimitOverriddenBy + ? overriddenByMap.get(queue.concurrencyLimitOverriddenBy) ?? null + : null, paused: queue.paused, }) ); diff --git a/apps/webapp/app/presenters/v3/QueueRetrievePresenter.server.ts b/apps/webapp/app/presenters/v3/QueueRetrievePresenter.server.ts index 409c14d545..bd885ea738 100644 --- a/apps/webapp/app/presenters/v3/QueueRetrievePresenter.server.ts +++ b/apps/webapp/app/presenters/v3/QueueRetrievePresenter.server.ts @@ -1,12 +1,18 @@ import { type AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { engine } from "~/v3/runEngine.server"; import { BasePresenter } from "./basePresenter.server"; -import { type TaskQueueType } from "@trigger.dev/database"; +import { TaskQueue, User, type TaskQueueType } from "@trigger.dev/database"; import { assertExhaustive } from "@trigger.dev/core"; import { determineEngineVersion } from "~/v3/engineVersion.server"; -import { type QueueItem, type RetrieveQueueParam } from "@trigger.dev/core/v3"; +import { type Prettify, type QueueItem, type RetrieveQueueParam } from "@trigger.dev/core/v3"; import { PrismaClientOrTransaction } from "@trigger.dev/database"; +export type FoundQueue = Prettify< + Omit & { + concurrencyLimitOverriddenBy?: User | null; + } +>; + /** * Shared queue lookup logic used by both QueueRetrievePresenter and PauseQueueService */ @@ -16,22 +22,50 @@ export async function getQueue( queue: RetrieveQueueParam ) { if (typeof queue === "string") { - return prismaClient.taskQueue.findFirst({ + return joinQueueWithUser( + prismaClient, + await prismaClient.taskQueue.findFirst({ + where: { + friendlyId: queue, + runtimeEnvironmentId: environment.id, + }, + }) + ); + } + + const queueName = + queue.type === "task" ? `task/${queue.name.replace(/^task\//, "")}` : queue.name; + return joinQueueWithUser( + prismaClient, + await prismaClient.taskQueue.findFirst({ where: { - friendlyId: queue, + name: queueName, runtimeEnvironmentId: environment.id, }, - }); + }) + ); +} + +async function joinQueueWithUser( + prismaClient: PrismaClientOrTransaction, + queue?: TaskQueue | null +): Promise { + if (!queue) return undefined; + if (!queue.concurrencyLimitOverriddenBy) { + return { + ...queue, + concurrencyLimitOverriddenBy: undefined, + }; } - const queueName = - queue.type === "task" ? `task/${queue.name.replace(/^task\//, "")}` : queue.name; - return prismaClient.taskQueue.findFirst({ - where: { - name: queueName, - runtimeEnvironmentId: environment.id, - }, + const user = await prismaClient.user.findFirst({ + where: { id: queue.concurrencyLimitOverriddenBy }, }); + + return { + ...queue, + concurrencyLimitOverriddenBy: user, + }; } export class QueueRetrievePresenter extends BasePresenter { @@ -75,6 +109,9 @@ export class QueueRetrievePresenter extends BasePresenter { running: results[1]?.[queue.name] ?? 0, queued: results[0]?.[queue.name] ?? 0, concurrencyLimit: queue.concurrencyLimit ?? null, + concurrencyLimitBase: queue.concurrencyLimitBase ?? null, + concurrencyLimitOverriddenAt: queue.concurrencyLimitOverriddenAt ?? null, + concurrencyLimitOverriddenBy: queue.concurrencyLimitOverriddenBy ?? null, paused: queue.paused, }), }; @@ -104,6 +141,9 @@ export function toQueueItem(data: { running: number; queued: number; concurrencyLimit: number | null; + concurrencyLimitBase: number | null; + concurrencyLimitOverriddenAt: Date | null; + concurrencyLimitOverriddenBy: User | null; paused: boolean; }): QueueItem & { releaseConcurrencyOnWaitpoint: boolean } { return { @@ -113,9 +153,22 @@ export function toQueueItem(data: { type: queueTypeFromType(data.type), running: data.running, queued: data.queued, - concurrencyLimit: data.concurrencyLimit, paused: data.paused, + concurrencyLimit: data.concurrencyLimit, + concurrency: { + current: data.concurrencyLimit, + base: data.concurrencyLimitBase, + override: data.concurrencyLimitOverriddenAt ? data.concurrencyLimit : null, + overriddenBy: toQueueConcurrencyOverriddenBy(data.concurrencyLimitOverriddenBy), + overriddenAt: data.concurrencyLimitOverriddenAt, + }, // TODO: This needs to be removed but keeping this here for now to avoid breaking existing clients releaseConcurrencyOnWaitpoint: true, }; } + +function toQueueConcurrencyOverriddenBy(user: User | null) { + if (!user) return null; + + return user.displayName ?? user.name ?? null; +} diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx index 80d6855ce4..03a6617bee 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx @@ -6,6 +6,7 @@ import { PauseIcon, PlayIcon, RectangleStackIcon, + WrenchScrewdriverIcon, } from "@heroicons/react/20/solid"; import { DialogClose } from "@radix-ui/react-dialog"; import { Form, useNavigation, useSearchParams, type MetaFunction } from "@remix-run/react"; @@ -61,6 +62,8 @@ import { cn } from "~/utils/cn"; import { docsPath, EnvironmentParamSchema, v3BillingPath, v3RunsPath } from "~/utils/pathBuilder"; import { PauseEnvironmentService } from "~/v3/services/pauseEnvironment.server"; import { PauseQueueService } from "~/v3/services/pauseQueue.server"; +import { concurrencySystem } from "~/v3/services/concurrencySystemInstance.server"; +import { getUserById } from "~/models/user.server"; import { useCurrentPlan } from "../_app.orgs.$organizationSlug/route"; import { Header3 } from "~/components/primitives/Headers"; import { Input } from "~/components/primitives/Input"; @@ -208,6 +211,75 @@ export const action = async ({ request, params }: ActionFunctionArgs) => { `Queue ${action === "queue-pause" ? "paused" : "resumed"}` ); } + case "queue-override": { + const friendlyId = formData.get("friendlyId"); + const concurrencyLimit = formData.get("concurrencyLimit"); + + if (!friendlyId) { + return redirectWithErrorMessage(redirectPath, request, "Queue ID is required"); + } + + if (!concurrencyLimit) { + return redirectWithErrorMessage(redirectPath, request, "Concurrency limit is required"); + } + + const limitNumber = parseInt(concurrencyLimit.toString(), 10); + if (isNaN(limitNumber) || limitNumber < 0) { + return redirectWithErrorMessage( + redirectPath, + request, + "Concurrency limit must be a valid number" + ); + } + + const user = await getUserById(userId); + if (!user) { + return redirectWithErrorMessage(redirectPath, request, "User not found"); + } + + const result = await concurrencySystem.queues.overrideQueueConcurrencyLimit( + environment, + friendlyId.toString(), + limitNumber, + user + ); + + if (!result.isOk()) { + return redirectWithErrorMessage( + redirectPath, + request, + "Failed to override queue concurrency limit" + ); + } + + return redirectWithSuccessMessage( + redirectPath, + request, + "Queue concurrency limit overridden" + ); + } + case "queue-remove-override": { + const friendlyId = formData.get("friendlyId"); + + if (!friendlyId) { + return redirectWithErrorMessage(redirectPath, request, "Queue ID is required"); + } + + const result = await concurrencySystem.queues.resetConcurrencyLimit( + environment, + friendlyId.toString() + ); + + if (!result.isOk()) { + return redirectWithErrorMessage( + redirectPath, + request, + "Failed to reset queue concurrency limit" + ); + } + + return redirectWithSuccessMessage(redirectPath, request, "Queue concurrency limit reset"); + } default: return redirectWithErrorMessage(redirectPath, request, "Something went wrong"); } @@ -375,7 +447,8 @@ export default function Page() { Name Queued - Running/limit + Running + Limit +
+ Overridden + + This queue's concurrency limit has been manually overridden from the + dashboard or API. + +
} > @@ -415,7 +499,7 @@ export default function Page() { {queues.length > 0 ? ( queues.map((queue) => { const limit = queue.concurrencyLimit ?? environment.concurrencyLimit; - const isAtLimit = queue.running === limit; + const isAtLimit = queue.running >= limit; const queueFilterableName = `${queue.type === "task" ? "task/" : ""}${ queue.name }`; @@ -472,24 +556,37 @@ export default function Page() { 0 && "text-text-bright", isAtLimit && "text-warning" )} > - {queue.running}/ - - {limit} - + {queue.running} + {limit} + + - {queue.concurrencyLimit ? "User" : "Environment"} + {queue.concurrency?.overriddenAt + ? "Overridden" + : queue.concurrencyLimit + ? "User" + : "Environment"} )} + - +
{hasFilters @@ -806,6 +908,143 @@ function QueuePauseResumeButton({ ); } +function QueueOverrideConcurrencyButton({ + queue, + environmentConcurrencyLimit, + variant = "minimal/small", + fullWidth = false, +}: { + queue: { + id: string; + name: string; + concurrencyLimit: number | null; + concurrency?: { overriddenAt: Date | null }; + }; + environmentConcurrencyLimit: number; + variant?: ButtonVariant; + fullWidth?: boolean; +}) { + const navigation = useNavigation(); + const [isOpen, setIsOpen] = useState(false); + const [concurrencyLimit, setConcurrencyLimit] = useState( + queue.concurrencyLimit?.toString() ?? environmentConcurrencyLimit.toString() + ); + + const isOverridden = !!queue.concurrency?.overriddenAt; + const currentLimit = queue.concurrencyLimit ?? environmentConcurrencyLimit; + + useEffect(() => { + if (navigation.state === "loading" || navigation.state === "idle") { + setIsOpen(false); + } + }, [navigation.state]); + + const isLoading = Boolean( + navigation.formData?.get("action") === "queue-override" || + navigation.formData?.get("action") === "queue-remove-override" + ); + + return ( + + + + + + + {isOverridden ? "Edit concurrency override" : "Override concurrency limit"} + +
+ {isOverridden ? ( + <> + + This queue's concurrency limit is currently overridden to {currentLimit}. + {queue.concurrencyLimit !== null && + ` The original limit set in code was ${queue.concurrencyLimit}.`} + + + You can update the override or remove it to restore the{" "} + {queue.concurrencyLimit !== null + ? "limit set in code" + : "environment concurrency limit"} + . + + + ) : ( + + Override this queue's concurrency limit. The current limit is {currentLimit}{" "} + {queue.concurrencyLimit !== null ? "(set in code)" : "(from environment)"}. + + )} +
setIsOpen(false)} className="space-y-3"> + +
+ + setConcurrencyLimit(e.target.value)} + placeholder={currentLimit.toString()} + autoFocus + /> +
+
+
+ {isOverridden && ( + + )} +
+ : WrenchScrewdriverIcon} + shortcut={{ modifiers: ["mod"], key: "enter" }} + > + {isOverridden ? "Update override" : "Override limit"} + + } + cancelButton={ + + + + } + /> +
+
+
+
+
+ ); +} + function EngineVersionUpgradeCallout() { return (
diff --git a/apps/webapp/app/routes/api.v1.queues.$queueParam.concurrency.override.ts b/apps/webapp/app/routes/api.v1.queues.$queueParam.concurrency.override.ts new file mode 100644 index 0000000000..3223a2a606 --- /dev/null +++ b/apps/webapp/app/routes/api.v1.queues.$queueParam.concurrency.override.ts @@ -0,0 +1,75 @@ +import { json } from "@remix-run/server-runtime"; +import { type RetrieveQueueParam, RetrieveQueueType } from "@trigger.dev/core/v3"; +import { z } from "zod"; +import { toQueueItem } from "~/presenters/v3/QueueRetrievePresenter.server"; +import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; +import { concurrencySystem } from "~/v3/services/concurrencySystemInstance.server"; + +const BodySchema = z.object({ + type: RetrieveQueueType.default("id"), + concurrencyLimit: z.number().int().min(0).max(100000), +}); + +export const { action } = createActionApiRoute( + { + body: BodySchema, + params: z.object({ + queueParam: z.string().transform((val) => val.replace(/%2F/g, "/")), + }), + }, + async ({ params, body, authentication }) => { + const input: RetrieveQueueParam = + body.type === "id" + ? params.queueParam + : { + type: body.type, + name: decodeURIComponent(params.queueParam).replace(/%2F/g, "/"), + }; + + return concurrencySystem.queues + .overrideQueueConcurrencyLimit(authentication.environment, input, body.concurrencyLimit) + .match( + (queue) => { + return json( + toQueueItem({ + friendlyId: queue.friendlyId, + name: queue.name, + type: queue.type, + running: queue.running, + queued: queue.queued, + concurrencyLimit: queue.concurrencyLimit, + concurrencyLimitBase: queue.concurrencyLimitBase, + concurrencyLimitOverriddenAt: queue.concurrencyLimitOverriddenAt, + concurrencyLimitOverriddenBy: null, + paused: queue.paused, + }), + { status: 200 } + ); + }, + (error) => { + switch (error.type) { + case "queue_not_found": { + return json({ error: "Queue not found" }, { status: 404 }); + } + case "queue_update_failed": { + return json({ error: "Failed to update queue concurrency limit" }, { status: 500 }); + } + case "sync_queue_concurrency_to_engine_failed": { + return json( + { error: "Failed to sync queue concurrency limit to engine" }, + { status: 500 } + ); + } + case "get_queue_stats_failed": { + return json({ error: "Failed to get queue stats" }, { status: 500 }); + } + case "other": + default: { + error.type satisfies "other"; + return json({ error: "Internal server error" }, { status: 500 }); + } + } + } + ); + } +); diff --git a/apps/webapp/app/routes/api.v1.queues.$queueParam.concurrency.reset.ts b/apps/webapp/app/routes/api.v1.queues.$queueParam.concurrency.reset.ts new file mode 100644 index 0000000000..dbeea591ad --- /dev/null +++ b/apps/webapp/app/routes/api.v1.queues.$queueParam.concurrency.reset.ts @@ -0,0 +1,75 @@ +import { json } from "@remix-run/server-runtime"; +import { type RetrieveQueueParam, RetrieveQueueType } from "@trigger.dev/core/v3"; +import { z } from "zod"; +import { toQueueItem } from "~/presenters/v3/QueueRetrievePresenter.server"; +import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; +import { concurrencySystem } from "~/v3/services/concurrencySystemInstance.server"; + +const BodySchema = z.object({ + type: RetrieveQueueType.default("id"), +}); + +export const { action } = createActionApiRoute( + { + body: BodySchema, + params: z.object({ + queueParam: z.string().transform((val) => val.replace(/%2F/g, "/")), + }), + }, + async ({ params, body, authentication }) => { + const input: RetrieveQueueParam = + body.type === "id" + ? params.queueParam + : { + type: body.type, + name: decodeURIComponent(params.queueParam).replace(/%2F/g, "/"), + }; + + return concurrencySystem.queues.resetConcurrencyLimit(authentication.environment, input).match( + (queue) => { + return json( + toQueueItem({ + friendlyId: queue.friendlyId, + name: queue.name, + type: queue.type, + running: queue.running, + queued: queue.queued, + concurrencyLimit: queue.concurrencyLimit, + concurrencyLimitBase: queue.concurrencyLimitBase, + concurrencyLimitOverriddenAt: queue.concurrencyLimitOverriddenAt, + concurrencyLimitOverriddenBy: null, + paused: queue.paused, + }), + { status: 200 } + ); + }, + (error) => { + switch (error.type) { + case "queue_not_found": { + return json({ error: "Queue not found" }, { status: 404 }); + } + case "queue_not_overridden": { + return json({ error: "Queue is not overridden" }, { status: 400 }); + } + case "queue_update_failed": { + return json({ error: "Failed to update queue concurrency limit" }, { status: 500 }); + } + case "sync_queue_concurrency_to_engine_failed": { + return json( + { error: "Failed to sync queue concurrency limit to engine" }, + { status: 500 } + ); + } + case "get_queue_stats_failed": { + return json({ error: "Failed to get queue stats" }, { status: 500 }); + } + case "other": + default: { + error.type satisfies "other"; + return json({ error: "Internal server error" }, { status: 500 }); + } + } + } + ); + } +); diff --git a/apps/webapp/app/v3/services/concurrencySystem.server.ts b/apps/webapp/app/v3/services/concurrencySystem.server.ts new file mode 100644 index 0000000000..8ef33e4792 --- /dev/null +++ b/apps/webapp/app/v3/services/concurrencySystem.server.ts @@ -0,0 +1,209 @@ +import { TaskQueue, User } from "@trigger.dev/database"; +import { errAsync, fromPromise, okAsync } from "neverthrow"; +import { PrismaClientOrTransaction } from "~/db.server"; +import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; +import { removeQueueConcurrencyLimits, updateQueueConcurrencyLimits } from "../runQueue.server"; +import { engine } from "../runEngine.server"; + +export type ConcurrencySystemOptions = { + db: PrismaClientOrTransaction; + reader: PrismaClientOrTransaction; +}; + +export type QueueInput = string | { type: "task" | "custom"; name: string }; + +export class ConcurrencySystem { + constructor(private readonly options: ConcurrencySystemOptions) {} + + private get db() { + return this.options.db; + } + + get queues() { + return { + overrideQueueConcurrencyLimit: ( + environment: AuthenticatedEnvironment, + queue: QueueInput, + concurrencyLimit: number, + overriddenBy?: User + ) => { + return findQueueFromInput(this.db, environment, queue) + .andThen((queue) => + overrideQueueConcurrencyLimit( + this.db, + environment, + queue, + concurrencyLimit, + overriddenBy + ) + ) + .andThen((queue) => syncQueueConcurrencyToEngine(environment, queue)) + .andThen((queue) => getQueueStats(environment, queue)); + }, + resetConcurrencyLimit: (environment: AuthenticatedEnvironment, queue: QueueInput) => { + return findQueueFromInput(this.db, environment, queue) + .andThen((queue) => resetQueueConcurrencyLimit(this.db, queue)) + .andThen((queue) => syncQueueConcurrencyToEngine(environment, queue)) + .andThen((queue) => getQueueStats(environment, queue)); + }, + }; + } +} + +function findQueueFromInput( + db: PrismaClientOrTransaction, + environment: AuthenticatedEnvironment, + queue: QueueInput +) { + if (typeof queue === "string") { + return findQueueByFriendlyId(db, environment, queue); + } + + const queueName = + queue.type === "task" ? `task/${queue.name.replace(/^task\//, "")}` : queue.name; + + return findQueueByName(db, environment, queueName); +} + +function findQueueByFriendlyId( + db: PrismaClientOrTransaction, + environment: AuthenticatedEnvironment, + friendlyId: string +) { + return fromPromise( + db.taskQueue.findFirst({ + where: { + runtimeEnvironmentId: environment.id, + friendlyId, + }, + }), + (error) => ({ + type: "other" as const, + cause: error, + }) + ).andThen((queue) => { + if (!queue) { + return errAsync({ type: "queue_not_found" as const }); + } + return okAsync(queue); + }); +} + +function findQueueByName( + db: PrismaClientOrTransaction, + environment: AuthenticatedEnvironment, + queue: string +) { + return fromPromise( + db.taskQueue.findFirst({ + where: { + runtimeEnvironmentId: environment.id, + name: queue, + }, + }), + (error) => ({ + type: "other" as const, + cause: error, + }) + ).andThen((queue) => { + if (!queue) { + return errAsync({ type: "queue_not_found" as const }); + } + return okAsync(queue); + }); +} + +function overrideQueueConcurrencyLimit( + db: PrismaClientOrTransaction, + environment: AuthenticatedEnvironment, + queue: TaskQueue, + concurrencyLimit: number, + overriddenBy?: User +) { + const newConcurrencyLimit = Math.max( + Math.min( + concurrencyLimit, + environment.maximumConcurrencyLimit, + environment.organization.maximumConcurrencyLimit + ), + 0 + ); + + const concurrencyLimitBase = queue.concurrencyLimitOverriddenAt + ? queue.concurrencyLimitBase + : queue.concurrencyLimit; + + return fromPromise( + db.taskQueue.update({ + where: { + id: queue.id, + }, + data: { + concurrencyLimit: newConcurrencyLimit, + concurrencyLimitBase: concurrencyLimitBase ?? null, + concurrencyLimitOverriddenAt: new Date(), + concurrencyLimitOverriddenBy: overriddenBy?.id ?? null, + }, + }), + (error) => ({ + type: "queue_update_failed" as const, + cause: error, + }) + ); +} + +function resetQueueConcurrencyLimit(db: PrismaClientOrTransaction, queue: TaskQueue) { + if (queue.concurrencyLimitOverriddenAt === null) { + return errAsync({ type: "queue_not_overridden" as const }); + } + + const newConcurrencyLimit = queue.concurrencyLimitBase; + + return fromPromise( + db.taskQueue.update({ + where: { id: queue.id }, + data: { + concurrencyLimitOverriddenAt: null, + concurrencyLimit: newConcurrencyLimit, + concurrencyLimitBase: null, + concurrencyLimitOverriddenBy: null, + }, + }), + (error) => ({ + type: "queue_update_failed" as const, + cause: error, + }) + ); +} + +function syncQueueConcurrencyToEngine(environment: AuthenticatedEnvironment, queue: TaskQueue) { + if (typeof queue.concurrencyLimit === "number") { + return fromPromise( + updateQueueConcurrencyLimits(environment, queue.name, queue.concurrencyLimit), + (error) => ({ + type: "sync_queue_concurrency_to_engine_failed" as const, + cause: error, + }) + ).andThen(() => okAsync(queue)); + } else { + return fromPromise(removeQueueConcurrencyLimits(environment, queue.name), (error) => ({ + type: "sync_queue_concurrency_to_engine_failed" as const, + cause: error, + })).andThen(() => okAsync(queue)); + } +} + +function getQueueStats(environment: AuthenticatedEnvironment, queue: TaskQueue) { + return fromPromise( + Promise.all([ + engine.lengthOfQueues(environment, [queue.name]), + engine.currentConcurrencyOfQueues(environment, [queue.name]), + ]), + (error) => ({ + type: "get_queue_stats_failed" as const, + cause: error, + }) + ).andThen(([queued, running]) => + okAsync({ queued: queued[queue.name] ?? 0, running: running[queue.name] ?? 0, ...queue }) + ); +} diff --git a/apps/webapp/app/v3/services/concurrencySystemInstance.server.ts b/apps/webapp/app/v3/services/concurrencySystemInstance.server.ts new file mode 100644 index 0000000000..fec48f2750 --- /dev/null +++ b/apps/webapp/app/v3/services/concurrencySystemInstance.server.ts @@ -0,0 +1,15 @@ +import { prisma, $replica } from "~/db.server"; +import { ConcurrencySystem } from "./concurrencySystem.server"; +import { singleton } from "~/utils/singleton"; + +export const concurrencySystem = singleton( + "concurrency-system", + initalizeConcurrencySystemInstance +); + +function initalizeConcurrencySystemInstance() { + return new ConcurrencySystem({ + db: prisma, + reader: $replica, + }); +} diff --git a/apps/webapp/app/v3/services/pauseQueue.server.ts b/apps/webapp/app/v3/services/pauseQueue.server.ts index f4e18eab4b..b5ba5f399a 100644 --- a/apps/webapp/app/v3/services/pauseQueue.server.ts +++ b/apps/webapp/app/v3/services/pauseQueue.server.ts @@ -88,6 +88,9 @@ export class PauseQueueService extends BaseService { running: results[1]?.[updatedQueue.name] ?? 0, queued: results[0]?.[updatedQueue.name] ?? 0, concurrencyLimit: updatedQueue.concurrencyLimit ?? null, + concurrencyLimitBase: updatedQueue.concurrencyLimitBase ?? null, + concurrencyLimitOverriddenAt: updatedQueue.concurrencyLimitOverriddenAt ?? null, + concurrencyLimitOverriddenBy: queue.concurrencyLimitOverriddenBy ?? null, paused: updatedQueue.paused, }), }; diff --git a/internal-packages/database/prisma/migrations/20251009140053_add_task_queue_concurrency_limit_base_column/migration.sql b/internal-packages/database/prisma/migrations/20251009140053_add_task_queue_concurrency_limit_base_column/migration.sql new file mode 100644 index 0000000000..825ef02b8d --- /dev/null +++ b/internal-packages/database/prisma/migrations/20251009140053_add_task_queue_concurrency_limit_base_column/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "public"."TaskQueue" ADD COLUMN "concurrencyLimitBase" INTEGER; \ No newline at end of file diff --git a/internal-packages/database/prisma/migrations/20251010102348_add_concurrency_limit_overriden_at_to_task_queue/migration.sql b/internal-packages/database/prisma/migrations/20251010102348_add_concurrency_limit_overriden_at_to_task_queue/migration.sql new file mode 100644 index 0000000000..13e5af08f5 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20251010102348_add_concurrency_limit_overriden_at_to_task_queue/migration.sql @@ -0,0 +1,3 @@ +-- AlterTable +ALTER TABLE "public"."TaskQueue" ADD COLUMN "concurrencyLimitOverriddenAt" TIMESTAMP(3); + diff --git a/internal-packages/database/prisma/migrations/20251010114444_add_concurrency_limit_overridden_by_to_task_queue/migration.sql b/internal-packages/database/prisma/migrations/20251010114444_add_concurrency_limit_overridden_by_to_task_queue/migration.sql new file mode 100644 index 0000000000..d8a81ce175 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20251010114444_add_concurrency_limit_overridden_by_to_task_queue/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "public"."TaskQueue" ADD COLUMN "concurrencyLimitOverriddenBy" TEXT; diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index af7cc70fe2..105dff4bef 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -1482,8 +1482,15 @@ model TaskQueue { runtimeEnvironment RuntimeEnvironment @relation(fields: [runtimeEnvironmentId], references: [id], onDelete: Cascade, onUpdate: Cascade) runtimeEnvironmentId String - concurrencyLimit Int? - rateLimit Json? + /// Represents the current concurrency limit for the queue + concurrencyLimit Int? + /// When the concurrency limit was overridden + concurrencyLimitOverriddenAt DateTime? + /// Who overrode the concurrency limit (will be null if overridden via the API) + concurrencyLimitOverriddenBy String? + /// If concurrencyLimit is overridden, this is the overridden value + concurrencyLimitBase Int? + rateLimit Json? paused Boolean @default(false) diff --git a/packages/core/src/v3/apiClient/index.ts b/packages/core/src/v3/apiClient/index.ts index fdd4bfc5e5..7264faa148 100644 --- a/packages/core/src/v3/apiClient/index.ts +++ b/packages/core/src/v3/apiClient/index.ts @@ -899,6 +899,53 @@ export class ApiClient { ); } + overrideQueueConcurrencyLimit( + queue: RetrieveQueueParam, + concurrencyLimit: number, + requestOptions?: ZodFetchOptions + ) { + const type = typeof queue === "string" ? "id" : queue.type; + const value = typeof queue === "string" ? queue : queue.name; + + // Explicitly encode slashes before encoding the rest of the string + const encodedValue = encodeURIComponent(value.replace(/\//g, "%2F")); + + return zodfetch( + QueueItem, + `${this.baseUrl}/api/v1/queues/${encodedValue}/concurrency/override`, + { + method: "POST", + headers: this.#getHeaders(false), + body: JSON.stringify({ + type, + concurrencyLimit, + }), + }, + mergeRequestOptions(this.defaultRequestOptions, requestOptions) + ); + } + + resetQueueConcurrencyLimit(queue: RetrieveQueueParam, requestOptions?: ZodFetchOptions) { + const type = typeof queue === "string" ? "id" : queue.type; + const value = typeof queue === "string" ? queue : queue.name; + + // Explicitly encode slashes before encoding the rest of the string + const encodedValue = encodeURIComponent(value.replace(/\//g, "%2F")); + + return zodfetch( + QueueItem, + `${this.baseUrl}/api/v1/queues/${encodedValue}/concurrency/reset`, + { + method: "POST", + headers: this.#getHeaders(false), + body: JSON.stringify({ + type, + }), + }, + mergeRequestOptions(this.defaultRequestOptions, requestOptions) + ); + } + subscribeToRun( runId: string, options?: { diff --git a/packages/core/src/v3/schemas/queues.ts b/packages/core/src/v3/schemas/queues.ts index cf42f0ed9a..34a47b34e3 100644 --- a/packages/core/src/v3/schemas/queues.ts +++ b/packages/core/src/v3/schemas/queues.ts @@ -28,10 +28,25 @@ export const QueueItem = z.object({ running: z.number(), /** The number of runs currently queued */ queued: z.number(), - /** The concurrency limit of the queue */ - concurrencyLimit: z.number().nullable(), /** Whether the queue is paused. If it's paused, no new runs will be started. */ paused: z.boolean(), + /** The concurrency limit of the queue */ + concurrencyLimit: z.number().nullable(), + /** The concurrency limit of the queue */ + concurrency: z + .object({ + /** The effective/current concurrency limit */ + current: z.number().nullable(), + /** The base concurrency limit (default) */ + base: z.number().nullable(), + /** The effective/current concurrency limit */ + override: z.number().nullable(), + /** When the override was applied */ + overriddenAt: z.coerce.date().nullable(), + /** Who overrode the concurrency limit (will be null if overridden via the API) */ + overriddenBy: z.string().nullable(), + }) + .optional(), }); export type QueueItem = z.infer; diff --git a/packages/trigger-sdk/src/v3/queues.ts b/packages/trigger-sdk/src/v3/queues.ts index 6788186f9d..f05a81ab68 100644 --- a/packages/trigger-sdk/src/v3/queues.ts +++ b/packages/trigger-sdk/src/v3/queues.ts @@ -132,6 +132,80 @@ export function pause( return apiClient.pauseQueue(queue, "pause", $requestOptions); } +/** + * Overrides the concurrency limit of a queue. + * + * @param queue - The ID of the queue to override the concurrency limit, or the type and name + * @param concurrencyLimit - The concurrency limit to override + * @returns The updated queue state + */ +export function overrideConcurrencyLimit( + queue: RetrieveQueueParam, + concurrencyLimit: number, + requestOptions?: ApiRequestOptions +): ApiPromise { + const apiClient = apiClientManager.clientOrThrow(); + + const $requestOptions = mergeRequestOptions( + { + tracer, + name: "queues.overrideConcurrencyLimit()", + icon: "queue", + attributes: { + ...flattenAttributes({ queue }), + ...accessoryAttributes({ + items: [ + { + text: typeof queue === "string" ? queue : queue.name, + variant: "normal", + }, + ], + style: "codepath", + }), + }, + }, + requestOptions + ); + + return apiClient.overrideQueueConcurrencyLimit(queue, concurrencyLimit, $requestOptions); +} + +/** + * Resets the concurrency limit of a queue to the base value. + * + * @param queue - The ID of the queue to reset the concurrency limit, or the type and name + * @returns The updated queue state + */ +export function resetConcurrencyLimit( + queue: RetrieveQueueParam, + requestOptions?: ApiRequestOptions +): ApiPromise { + const apiClient = apiClientManager.clientOrThrow(); + + const $requestOptions = mergeRequestOptions( + { + tracer, + name: "queues.resetConcurrencyLimit()", + icon: "queue", + attributes: { + ...flattenAttributes({ queue }), + ...accessoryAttributes({ + items: [ + { + text: typeof queue === "string" ? queue : queue.name, + variant: "normal", + }, + ], + style: "codepath", + }), + }, + }, + requestOptions + ); + + return apiClient.resetQueueConcurrencyLimit(queue, $requestOptions); +} + /** * Resumes a paused queue, allowing new runs to be started. * diff --git a/references/hello-world/src/trigger/queues.ts b/references/hello-world/src/trigger/queues.ts index 7eb05c99b4..69c4216b97 100644 --- a/references/hello-world/src/trigger/queues.ts +++ b/references/hello-world/src/trigger/queues.ts @@ -1,4 +1,4 @@ -import { batch, logger, queue, queues, task, tasks } from "@trigger.dev/sdk/v3"; +import { batch, logger, queue, queues, runs, task, tasks } from "@trigger.dev/sdk/v3"; export const queuesTester = task({ id: "queues-tester", @@ -37,6 +37,22 @@ export const queuesTester = task({ name: "queues-tester", }); logger.log("Resumed queue", { resumedQueue }); + + const overriddenQueue = await queues.overrideConcurrencyLimit( + { + type: "task", + name: "queues-tester", + }, + 10 + ); + logger.log("Overridden queue", { overriddenQueue }); + + const resetQueue = await queues.resetConcurrencyLimit({ + type: "task", + name: "queues-tester", + }); + + logger.log("Reset queue", { resetQueue }); }, }); @@ -262,3 +278,56 @@ export const lockedTaskIdentifierTask = task({ await tasks.triggerAndWait("task_does_not_exist", {}); }, }); + +export const testQueuesConcurrencyOverrideTask = task({ + id: "test-queues-concurrency-override-task", + run: async (payload: any) => { + logger.log("Test queues concurrency override task", { payload }); + + const handle1 = await testQueuesConcurrencyOverrideChildTask.trigger({}); + const handle2 = await testQueuesConcurrencyOverrideChildTask.trigger({}); + + const result1 = await runs.poll(handle1.id); + const result2 = await runs.poll(handle2.id); + + logger.log("Test queues concurrency override task results", { result1, result2 }); + + const handle3 = await testQueuesConcurrencyOverrideChildTask.trigger({}); + const handle4 = await testQueuesConcurrencyOverrideChildTask.trigger({}); + + const overriddenQueue = await queues.overrideConcurrencyLimit( + { + type: "custom", + name: "test-queues-concurrency-override-queue", + }, + 2 + ); + + logger.log("Overridden queue", { overriddenQueue }); + + const result3 = await runs.poll(handle3.id); + const result4 = await runs.poll(handle4.id); + + logger.log("Test queues concurrency override task results", { result3, result4 }); + + const resetQueue = await queues.resetConcurrencyLimit({ + type: "custom", + name: "test-queues-concurrency-override-queue", + }); + + logger.log("Reset queue", { resetQueue }); + }, +}); + +export const testQueuesConcurrencyOverrideChildTask = task({ + id: "test-queues-concurrency-override-child-task", + queue: { + name: "test-queues-concurrency-override-queue", + concurrencyLimit: 1, + }, + run: async (payload: any) => { + logger.log("Test queues concurrency override child task", { payload }); + + await setTimeout(10000); + }, +}); From ff424154e58122e5e7cae585d1935613d534d354 Mon Sep 17 00:00:00 2001 From: James Ritchie Date: Wed, 15 Oct 2025 14:26:46 +0100 Subject: [PATCH 02/13] Updates the modal layout and tweaks copy --- .../route.tsx | 93 +++++++++---------- 1 file changed, 44 insertions(+), 49 deletions(-) diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx index 03a6617bee..318fd8ee17 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx @@ -964,24 +964,20 @@ function QueueOverrideConcurrencyButton({
{isOverridden ? ( - <> - - This queue's concurrency limit is currently overridden to {currentLimit}. - {queue.concurrencyLimit !== null && - ` The original limit set in code was ${queue.concurrencyLimit}.`} - - - You can update the override or remove it to restore the{" "} - {queue.concurrencyLimit !== null - ? "limit set in code" - : "environment concurrency limit"} - . - - + + This queue's concurrency limit is currently overridden to {currentLimit}. + {queue.concurrencyLimit !== null && + ` The original limit set in code was ${queue.concurrencyLimit}.`}{" "} + You can update the override or remove it to restore the{" "} + {queue.concurrencyLimit !== null + ? "limit set in code" + : "environment concurrency limit"} + . + ) : ( - - Override this queue's concurrency limit. The current limit is {currentLimit}{" "} - {queue.concurrencyLimit !== null ? "(set in code)" : "(from environment)"}. + + Override this queue's concurrency limit. The current limit is {currentLimit}, which is + set {queue.concurrencyLimit !== null ? "in code" : "by the environment"}. )}
setIsOpen(false)} className="space-y-3"> @@ -1001,43 +997,42 @@ function QueueOverrideConcurrencyButton({ autoFocus />
-
-
- {isOverridden && ( - - )} -
- : WrenchScrewdriverIcon} - shortcut={{ modifiers: ["mod"], key: "enter" }} - > - {isOverridden ? "Update override" : "Override limit"} - - } - cancelButton={ + + } + shortcut={{ modifiers: ["mod"], key: "enter" }} + > + {isOverridden ? "Update override" : "Override limit"} + + } + cancelButton={ +
+ {isOverridden && ( + + )} - } - /> -
+
+ } + />
From d18db784b913147ac389fa7a86a549c6b916fb08 Mon Sep 17 00:00:00 2001 From: James Ritchie Date: Wed, 15 Oct 2025 14:59:38 +0100 Subject: [PATCH 03/13] Improves the dropdown menu item --- .../route.tsx | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx index 318fd8ee17..91dd4b0d74 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx @@ -951,11 +951,10 @@ function QueueOverrideConcurrencyButton({ type="button" variant={variant} LeadingIcon={WrenchScrewdriverIcon} - leadingIconClassName={isOverridden ? "text-indigo-500" : "text-dimmed"} fullWidth={fullWidth} textAlignLeft={fullWidth} > - {isOverridden ? "Edit override..." : "Override limit..."} + {isOverridden ? "Edit override…" : "Override limit…"} From 5993ea179f412f9f52e76c2822e2b98b8e042343 Mon Sep 17 00:00:00 2001 From: James Ritchie Date: Wed, 15 Oct 2025 15:47:13 +0100 Subject: [PATCH 04/13] Popover supports both Button and LinkButton --- .../app/components/primitives/Popover.tsx | 47 ++++++++++++------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/apps/webapp/app/components/primitives/Popover.tsx b/apps/webapp/app/components/primitives/Popover.tsx index 22156927f3..f679d03c9b 100644 --- a/apps/webapp/app/components/primitives/Popover.tsx +++ b/apps/webapp/app/components/primitives/Popover.tsx @@ -7,7 +7,7 @@ import * as React from "react"; import { DropdownIcon } from "~/assets/icons/DropdownIcon"; import * as useShortcutKeys from "~/hooks/useShortcutKeys"; import { cn } from "~/utils/cn"; -import { type ButtonContentPropsType, LinkButton } from "./Buttons"; +import { type ButtonContentPropsType, Button, LinkButton } from "./Buttons"; import { Paragraph, type ParagraphVariant } from "./Paragraph"; import { ShortcutKey } from "./ShortcutKey"; import { type RenderIcon } from "./Icon"; @@ -60,32 +60,45 @@ function PopoverMenuItem({ variant = { variant: "small-menu-item" }, leadingIconClassName, className, + onClick, + disabled, }: { - to: string; + to?: string; icon?: RenderIcon; title: React.ReactNode; isSelected?: boolean; variant?: ButtonContentPropsType; leadingIconClassName?: string; className?: string; + onClick?: React.MouseEventHandler; + disabled?: boolean; }) { + const commonProps = { + variant: variant.variant, + LeadingIcon: icon, + leadingIconClassName, + fullWidth: true, + textAlignLeft: true, + TrailingIcon: isSelected ? CheckIcon : undefined, + className: cn( + "group-hover:bg-charcoal-700", + isSelected ? "bg-charcoal-750 group-hover:bg-charcoal-600/50" : undefined, + className + ), + } as const; + + if (to) { + return ( + + {title} + + ); + } + return ( - + ); } From c05f60bf38c8c2b320443f097f41e467b97f5ce5 Mon Sep 17 00:00:00 2001 From: James Ritchie Date: Wed, 15 Oct 2025 15:53:24 +0100 Subject: [PATCH 05/13] Right align the columns and fix the dropdown menu item styles --- .../route.tsx | 130 +++++++++--------- 1 file changed, 64 insertions(+), 66 deletions(-) diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx index 91dd4b0d74..0e0c4b50dc 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx @@ -1,4 +1,5 @@ import { + AdjustmentsHorizontalIcon, ArrowUpCircleIcon, BookOpenIcon, ChatBubbleLeftEllipsisIcon, @@ -71,6 +72,7 @@ import { useThrottle } from "~/hooks/useThrottle"; import { RunsIcon } from "~/assets/icons/RunsIcon"; import { useAutoRevalidate } from "~/hooks/useAutoRevalidate"; import { env } from "~/env.server"; +import { PopoverMenuItem } from "~/components/primitives/Popover"; const SearchParamsSchema = z.object({ query: z.string().optional(), @@ -475,7 +477,7 @@ export default function Page() {
- Overridden + Override {queue.queued} 0 && "text-text-bright", isAtLimit && "text-warning" @@ -567,9 +572,9 @@ export default function Page() { {limit} @@ -577,16 +582,22 @@ export default function Page() { - {queue.concurrency?.overriddenAt - ? "Overridden" - : queue.concurrencyLimit - ? "User" - : "Environment"} + {queue.concurrency?.overriddenAt ? ( +
+ + Override +
+ ) : queue.concurrencyLimit ? ( + "User" + ) : ( + "Environment" + )}
)} - - - View all runs - - + + - View queued runs - - + - View running runs - + /> } /> @@ -827,29 +827,26 @@ function QueuePauseResumeButton({ fullWidth?: boolean; showTooltip?: boolean; }) { - const navigation = useNavigation(); const [isOpen, setIsOpen] = useState(false); - const button = ( - - ); - const trigger = showTooltip ? (
- {button} + + +
@@ -861,7 +858,13 @@ function QueuePauseResumeButton({
) : ( - {button} + + + ); return ( @@ -947,15 +950,10 @@ function QueueOverrideConcurrencyButton({ return ( - + From b2fd391f79cc14ddab0fb1198781a8f503c8d8c2 Mon Sep 17 00:00:00 2001 From: James Ritchie Date: Wed, 15 Oct 2025 16:07:09 +0100 Subject: [PATCH 06/13] Organize imports, --- .../route.tsx | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx index 0e0c4b50dc..b29d2780d9 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx @@ -7,7 +7,6 @@ import { PauseIcon, PlayIcon, RectangleStackIcon, - WrenchScrewdriverIcon, } from "@heroicons/react/20/solid"; import { DialogClose } from "@radix-ui/react-dialog"; import { Form, useNavigation, useSearchParams, type MetaFunction } from "@remix-run/react"; @@ -16,6 +15,7 @@ import type { RuntimeEnvironmentType } from "@trigger.dev/database"; import { useEffect, useState } from "react"; import { typedjson, useTypedLoaderData } from "remix-typedjson"; import { z } from "zod"; +import { RunsIcon } from "~/assets/icons/RunsIcon"; import { TaskIconSmall } from "~/assets/icons/TaskIcon"; import upgradeForQueuesPath from "~/assets/images/queues-dashboard.png"; import { AdminDebugTooltip } from "~/components/admin/debugTooltip"; @@ -25,13 +25,16 @@ import { Feedback } from "~/components/Feedback"; import { PageBody, PageContainer } from "~/components/layout/AppLayout"; import { BigNumber } from "~/components/metrics/BigNumber"; import { Badge } from "~/components/primitives/Badge"; -import { Button, type ButtonVariant, LinkButton } from "~/components/primitives/Buttons"; +import { Button, LinkButton, type ButtonVariant } from "~/components/primitives/Buttons"; import { Callout } from "~/components/primitives/Callout"; import { Dialog, DialogContent, DialogHeader, DialogTrigger } from "~/components/primitives/Dialog"; import { FormButtons } from "~/components/primitives/FormButtons"; +import { Header3 } from "~/components/primitives/Headers"; +import { Input } from "~/components/primitives/Input"; import { NavBar, PageAccessories, PageTitle } from "~/components/primitives/PageHeader"; import { PaginationControls } from "~/components/primitives/Pagination"; import { Paragraph } from "~/components/primitives/Paragraph"; +import { PopoverMenuItem } from "~/components/primitives/Popover"; import { Spinner } from "~/components/primitives/Spinner"; import { Table, @@ -50,29 +53,25 @@ import { TooltipProvider, TooltipTrigger, } from "~/components/primitives/Tooltip"; +import { env } from "~/env.server"; +import { useAutoRevalidate } from "~/hooks/useAutoRevalidate"; import { useEnvironment } from "~/hooks/useEnvironment"; import { useOrganization } from "~/hooks/useOrganizations"; import { useProject } from "~/hooks/useProject"; +import { useThrottle } from "~/hooks/useThrottle"; import { redirectWithErrorMessage, redirectWithSuccessMessage } from "~/models/message.server"; import { findProjectBySlug } from "~/models/project.server"; import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server"; +import { getUserById } from "~/models/user.server"; import { EnvironmentQueuePresenter } from "~/presenters/v3/EnvironmentQueuePresenter.server"; import { QueueListPresenter } from "~/presenters/v3/QueueListPresenter.server"; import { requireUserId } from "~/services/session.server"; import { cn } from "~/utils/cn"; import { docsPath, EnvironmentParamSchema, v3BillingPath, v3RunsPath } from "~/utils/pathBuilder"; +import { concurrencySystem } from "~/v3/services/concurrencySystemInstance.server"; import { PauseEnvironmentService } from "~/v3/services/pauseEnvironment.server"; import { PauseQueueService } from "~/v3/services/pauseQueue.server"; -import { concurrencySystem } from "~/v3/services/concurrencySystemInstance.server"; -import { getUserById } from "~/models/user.server"; import { useCurrentPlan } from "../_app.orgs.$organizationSlug/route"; -import { Header3 } from "~/components/primitives/Headers"; -import { Input } from "~/components/primitives/Input"; -import { useThrottle } from "~/hooks/useThrottle"; -import { RunsIcon } from "~/assets/icons/RunsIcon"; -import { useAutoRevalidate } from "~/hooks/useAutoRevalidate"; -import { env } from "~/env.server"; -import { PopoverMenuItem } from "~/components/primitives/Popover"; const SearchParamsSchema = z.object({ query: z.string().optional(), From 79a4c6cfac8aa1e4916acedada22ed8d1f886430 Mon Sep 17 00:00:00 2001 From: James Ritchie Date: Wed, 15 Oct 2025 16:07:22 +0100 Subject: [PATCH 07/13] Fix spinner icon in dropdown menu --- .../route.tsx | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx index b29d2780d9..f66f38cda3 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx @@ -646,8 +646,8 @@ export default function Page() { })} /> } From d989d9d3ba4c98cc145862f23fb6b284ecf38a42 Mon Sep 17 00:00:00 2001 From: James Ritchie Date: Wed, 15 Oct 2025 16:07:33 +0100 Subject: [PATCH 08/13] Remove unused props --- .../route.tsx | 4 ---- 1 file changed, 4 deletions(-) diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx index f66f38cda3..d4596451fc 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx @@ -912,8 +912,6 @@ function QueuePauseResumeButton({ function QueueOverrideConcurrencyButton({ queue, environmentConcurrencyLimit, - variant = "minimal/small", - fullWidth = false, }: { queue: { id: string; @@ -922,8 +920,6 @@ function QueueOverrideConcurrencyButton({ concurrency?: { overriddenAt: Date | null }; }; environmentConcurrencyLimit: number; - variant?: ButtonVariant; - fullWidth?: boolean; }) { const navigation = useNavigation(); const [isOpen, setIsOpen] = useState(false); From 0be502d8bd526d6a3d4994748d200089dad82c84 Mon Sep 17 00:00:00 2001 From: James Ritchie Date: Wed, 15 Oct 2025 16:26:41 +0100 Subject: [PATCH 09/13] Adds a tooltip to the Concurrency override badge --- .../route.tsx | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx index d4596451fc..d51fe42a01 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx @@ -536,6 +536,18 @@ export default function Page() { {queue.name} + {queue.concurrency?.overriddenAt ? ( + + Concurrency overriden + + } + content="This queue's concurrency limit has been manually overridden from the dashboard or API." + className="max-w-xs" + disableHoverableContent + /> + ) : null} {queue.paused ? ( Paused @@ -588,10 +600,7 @@ export default function Page() { )} > {queue.concurrency?.overriddenAt ? ( -
- - Override -
+ Override ) : queue.concurrencyLimit ? ( "User" ) : ( From 457aafe9b3f152eb20c908ca4ec2350b6d88dc73 Mon Sep 17 00:00:00 2001 From: James Ritchie Date: Wed, 15 Oct 2025 16:43:34 +0100 Subject: [PATCH 10/13] Fixes console error with popover menu --- .../app/components/primitives/Popover.tsx | 116 +++++++++++------- 1 file changed, 70 insertions(+), 46 deletions(-) diff --git a/apps/webapp/app/components/primitives/Popover.tsx b/apps/webapp/app/components/primitives/Popover.tsx index f679d03c9b..7bdf9e902e 100644 --- a/apps/webapp/app/components/primitives/Popover.tsx +++ b/apps/webapp/app/components/primitives/Popover.tsx @@ -5,9 +5,10 @@ import { EllipsisVerticalIcon } from "@heroicons/react/24/solid"; import * as PopoverPrimitive from "@radix-ui/react-popover"; import * as React from "react"; import { DropdownIcon } from "~/assets/icons/DropdownIcon"; +import { Link } from "@remix-run/react"; import * as useShortcutKeys from "~/hooks/useShortcutKeys"; import { cn } from "~/utils/cn"; -import { type ButtonContentPropsType, Button, LinkButton } from "./Buttons"; +import { type ButtonContentPropsType, Button, ButtonContent } from "./Buttons"; import { Paragraph, type ParagraphVariant } from "./Paragraph"; import { ShortcutKey } from "./ShortcutKey"; import { type RenderIcon } from "./Icon"; @@ -52,55 +53,78 @@ function PopoverSectionHeader({ ); } -function PopoverMenuItem({ - to, - icon, - title, - isSelected, - variant = { variant: "small-menu-item" }, - leadingIconClassName, - className, - onClick, - disabled, -}: { - to?: string; - icon?: RenderIcon; - title: React.ReactNode; - isSelected?: boolean; - variant?: ButtonContentPropsType; - leadingIconClassName?: string; - className?: string; - onClick?: React.MouseEventHandler; - disabled?: boolean; -}) { - const commonProps = { - variant: variant.variant, - LeadingIcon: icon, - leadingIconClassName, - fullWidth: true, - textAlignLeft: true, - TrailingIcon: isSelected ? CheckIcon : undefined, - className: cn( - "group-hover:bg-charcoal-700", - isSelected ? "bg-charcoal-750 group-hover:bg-charcoal-600/50" : undefined, - className - ), - } as const; +const PopoverMenuItem = React.forwardRef< + HTMLButtonElement | HTMLAnchorElement, + { + to?: string; + icon?: RenderIcon; + title: React.ReactNode; + isSelected?: boolean; + variant?: ButtonContentPropsType; + leadingIconClassName?: string; + className?: string; + onClick?: React.MouseEventHandler; + disabled?: boolean; + } +>( + ( + { + to, + icon, + title, + isSelected, + variant = { variant: "small-menu-item" }, + leadingIconClassName, + className, + onClick, + disabled, + }, + ref + ) => { + const contentProps = { + variant: variant.variant, + LeadingIcon: icon, + leadingIconClassName, + fullWidth: true, + textAlignLeft: true, + TrailingIcon: isSelected ? CheckIcon : undefined, + className: cn( + "group-hover:bg-charcoal-700", + isSelected ? "bg-charcoal-750 group-hover:bg-charcoal-600/50" : undefined, + className + ), + } as const; + + if (to) { + return ( + } + className={cn("group/button focus-custom", contentProps.fullWidth ? "w-full" : "")} + onClick={onClick as any} + > + {title} + + ); + } - if (to) { return ( - - {title} - + ); } - - return ( - - ); -} +); +PopoverMenuItem.displayName = "PopoverMenuItem"; function PopoverCustomTrigger({ isOpen, From aa7e51fb303eb1ea8160063ba145fbb743ce2a4d Mon Sep 17 00:00:00 2001 From: James Ritchie Date: Wed, 15 Oct 2025 16:47:29 +0100 Subject: [PATCH 11/13] typo --- .../route.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx index d51fe42a01..0400afcfdb 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx @@ -540,7 +540,7 @@ export default function Page() { - Concurrency overriden + Concurrency limit overridden
} content="This queue's concurrency limit has been manually overridden from the dashboard or API." From 91c14946eae48b519258f89137bfae282030f9f4 Mon Sep 17 00:00:00 2001 From: James Ritchie Date: Wed, 15 Oct 2025 16:56:54 +0100 Subject: [PATCH 12/13] Fixes incorrect className --- apps/webapp/app/components/primitives/Buttons.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/webapp/app/components/primitives/Buttons.tsx b/apps/webapp/app/components/primitives/Buttons.tsx index 6859608eef..d6e4d0ac45 100644 --- a/apps/webapp/app/components/primitives/Buttons.tsx +++ b/apps/webapp/app/components/primitives/Buttons.tsx @@ -276,7 +276,7 @@ export function ButtonContent(props: ButtonContentPropsType) { {buttonContent} - + {tooltip} {shortcut && renderShortcutKey()} From b993e9f50b8f7ec14e3155bff618e36c4dffe019 Mon Sep 17 00:00:00 2001 From: James Ritchie Date: Wed, 15 Oct 2025 17:06:44 +0100 Subject: [PATCH 13/13] Minimal buttons to view runs --- .../route.tsx | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx index 0400afcfdb..3a6b3189e4 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx @@ -340,17 +340,19 @@ export default function Page() { animate accessory={
+ {environment.runsEnabled ? : null} - View runs - - {environment.runsEnabled ? : null} + tooltip="View queued runs" + />
} valueClassName={env.paused ? "text-warning" : undefined} @@ -373,15 +375,17 @@ export default function Page() { } accessory={ - View runs - + tooltip="View runs" + /> } compactThreshold={1000000} />