diff --git a/apps/webapp/app/components/primitives/Buttons.tsx b/apps/webapp/app/components/primitives/Buttons.tsx index 6859608eef..d6e4d0ac45 100644 --- a/apps/webapp/app/components/primitives/Buttons.tsx +++ b/apps/webapp/app/components/primitives/Buttons.tsx @@ -276,7 +276,7 @@ export function ButtonContent(props: ButtonContentPropsType) { {buttonContent} - + {tooltip} {shortcut && renderShortcutKey()} diff --git a/apps/webapp/app/components/primitives/Popover.tsx b/apps/webapp/app/components/primitives/Popover.tsx index 22156927f3..7bdf9e902e 100644 --- a/apps/webapp/app/components/primitives/Popover.tsx +++ b/apps/webapp/app/components/primitives/Popover.tsx @@ -5,9 +5,10 @@ import { EllipsisVerticalIcon } from "@heroicons/react/24/solid"; import * as PopoverPrimitive from "@radix-ui/react-popover"; import * as React from "react"; import { DropdownIcon } from "~/assets/icons/DropdownIcon"; +import { Link } from "@remix-run/react"; import * as useShortcutKeys from "~/hooks/useShortcutKeys"; import { cn } from "~/utils/cn"; -import { type ButtonContentPropsType, LinkButton } from "./Buttons"; +import { type ButtonContentPropsType, Button, ButtonContent } from "./Buttons"; import { Paragraph, type ParagraphVariant } from "./Paragraph"; import { ShortcutKey } from "./ShortcutKey"; import { type RenderIcon } from "./Icon"; @@ -52,42 +53,78 @@ function PopoverSectionHeader({ ); } -function PopoverMenuItem({ - to, - icon, - title, - isSelected, - variant = { variant: "small-menu-item" }, - leadingIconClassName, - className, -}: { - to: string; - icon?: RenderIcon; - title: React.ReactNode; - isSelected?: boolean; - variant?: ButtonContentPropsType; - leadingIconClassName?: string; - className?: string; -}) { - return ( - ( + ( + { + to, + icon, + title, + isSelected, + variant = { variant: "small-menu-item" }, + leadingIconClassName, + className, + onClick, + disabled, + }, + ref + ) => { + const contentProps = { + variant: variant.variant, + LeadingIcon: icon, + leadingIconClassName, + fullWidth: true, + textAlignLeft: true, + TrailingIcon: isSelected ? CheckIcon : undefined, + className: cn( "group-hover:bg-charcoal-700", isSelected ? "bg-charcoal-750 group-hover:bg-charcoal-600/50" : undefined, className - )} - > - {title} - - ); -} + ), + } as const; + + if (to) { + return ( + } + className={cn("group/button focus-custom", contentProps.fullWidth ? "w-full" : "")} + onClick={onClick as any} + > + {title} + + ); + } + + return ( + + ); + } +); +PopoverMenuItem.displayName = "PopoverMenuItem"; function PopoverCustomTrigger({ isOpen, diff --git a/apps/webapp/app/presenters/v3/QueueListPresenter.server.ts b/apps/webapp/app/presenters/v3/QueueListPresenter.server.ts index ec60db8b92..0fe9e3f365 100644 --- a/apps/webapp/app/presenters/v3/QueueListPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/QueueListPresenter.server.ts @@ -114,6 +114,9 @@ export class QueueListPresenter extends BasePresenter { name: true, orderableName: true, concurrencyLimit: true, + concurrencyLimitBase: true, + concurrencyLimitOverriddenAt: true, + concurrencyLimitOverriddenBy: true, type: true, paused: true, }, @@ -135,6 +138,17 @@ export class QueueListPresenter extends BasePresenter { ), ]); + // Manually "join" the overridden users because there is no way to implement the relationship + // in prisma without adding a foreign key constraint + const overriddenByIds = queues.map((q) => q.concurrencyLimitOverriddenBy).filter(Boolean); + const overriddenByUsers = await this._replica.user.findMany({ + where: { + id: { in: overriddenByIds }, + }, + }); + + const overriddenByMap = new Map(overriddenByUsers.map((u) => [u.id, u])); + // Transform queues to include running and queued counts return queues.map((queue) => toQueueItem({ @@ -144,6 +158,11 @@ export class QueueListPresenter extends BasePresenter { running: results[1][queue.name] ?? 0, queued: results[0][queue.name] ?? 0, concurrencyLimit: queue.concurrencyLimit ?? null, + concurrencyLimitBase: queue.concurrencyLimitBase ?? null, + concurrencyLimitOverriddenAt: queue.concurrencyLimitOverriddenAt ?? null, + concurrencyLimitOverriddenBy: queue.concurrencyLimitOverriddenBy + ? overriddenByMap.get(queue.concurrencyLimitOverriddenBy) ?? null + : null, paused: queue.paused, }) ); diff --git a/apps/webapp/app/presenters/v3/QueueRetrievePresenter.server.ts b/apps/webapp/app/presenters/v3/QueueRetrievePresenter.server.ts index 409c14d545..bd885ea738 100644 --- a/apps/webapp/app/presenters/v3/QueueRetrievePresenter.server.ts +++ b/apps/webapp/app/presenters/v3/QueueRetrievePresenter.server.ts @@ -1,12 +1,18 @@ import { type AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { engine } from "~/v3/runEngine.server"; import { BasePresenter } from "./basePresenter.server"; -import { type TaskQueueType } from "@trigger.dev/database"; +import { TaskQueue, User, type TaskQueueType } from "@trigger.dev/database"; import { assertExhaustive } from "@trigger.dev/core"; import { determineEngineVersion } from "~/v3/engineVersion.server"; -import { type QueueItem, type RetrieveQueueParam } from "@trigger.dev/core/v3"; +import { type Prettify, type QueueItem, type RetrieveQueueParam } from "@trigger.dev/core/v3"; import { PrismaClientOrTransaction } from "@trigger.dev/database"; +export type FoundQueue = Prettify< + Omit & { + concurrencyLimitOverriddenBy?: User | null; + } +>; + /** * Shared queue lookup logic used by both QueueRetrievePresenter and PauseQueueService */ @@ -16,22 +22,50 @@ export async function getQueue( queue: RetrieveQueueParam ) { if (typeof queue === "string") { - return prismaClient.taskQueue.findFirst({ + return joinQueueWithUser( + prismaClient, + await prismaClient.taskQueue.findFirst({ + where: { + friendlyId: queue, + runtimeEnvironmentId: environment.id, + }, + }) + ); + } + + const queueName = + queue.type === "task" ? `task/${queue.name.replace(/^task\//, "")}` : queue.name; + return joinQueueWithUser( + prismaClient, + await prismaClient.taskQueue.findFirst({ where: { - friendlyId: queue, + name: queueName, runtimeEnvironmentId: environment.id, }, - }); + }) + ); +} + +async function joinQueueWithUser( + prismaClient: PrismaClientOrTransaction, + queue?: TaskQueue | null +): Promise { + if (!queue) return undefined; + if (!queue.concurrencyLimitOverriddenBy) { + return { + ...queue, + concurrencyLimitOverriddenBy: undefined, + }; } - const queueName = - queue.type === "task" ? `task/${queue.name.replace(/^task\//, "")}` : queue.name; - return prismaClient.taskQueue.findFirst({ - where: { - name: queueName, - runtimeEnvironmentId: environment.id, - }, + const user = await prismaClient.user.findFirst({ + where: { id: queue.concurrencyLimitOverriddenBy }, }); + + return { + ...queue, + concurrencyLimitOverriddenBy: user, + }; } export class QueueRetrievePresenter extends BasePresenter { @@ -75,6 +109,9 @@ export class QueueRetrievePresenter extends BasePresenter { running: results[1]?.[queue.name] ?? 0, queued: results[0]?.[queue.name] ?? 0, concurrencyLimit: queue.concurrencyLimit ?? null, + concurrencyLimitBase: queue.concurrencyLimitBase ?? null, + concurrencyLimitOverriddenAt: queue.concurrencyLimitOverriddenAt ?? null, + concurrencyLimitOverriddenBy: queue.concurrencyLimitOverriddenBy ?? null, paused: queue.paused, }), }; @@ -104,6 +141,9 @@ export function toQueueItem(data: { running: number; queued: number; concurrencyLimit: number | null; + concurrencyLimitBase: number | null; + concurrencyLimitOverriddenAt: Date | null; + concurrencyLimitOverriddenBy: User | null; paused: boolean; }): QueueItem & { releaseConcurrencyOnWaitpoint: boolean } { return { @@ -113,9 +153,22 @@ export function toQueueItem(data: { type: queueTypeFromType(data.type), running: data.running, queued: data.queued, - concurrencyLimit: data.concurrencyLimit, paused: data.paused, + concurrencyLimit: data.concurrencyLimit, + concurrency: { + current: data.concurrencyLimit, + base: data.concurrencyLimitBase, + override: data.concurrencyLimitOverriddenAt ? data.concurrencyLimit : null, + overriddenBy: toQueueConcurrencyOverriddenBy(data.concurrencyLimitOverriddenBy), + overriddenAt: data.concurrencyLimitOverriddenAt, + }, // TODO: This needs to be removed but keeping this here for now to avoid breaking existing clients releaseConcurrencyOnWaitpoint: true, }; } + +function toQueueConcurrencyOverriddenBy(user: User | null) { + if (!user) return null; + + return user.displayName ?? user.name ?? null; +} diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx index 80d6855ce4..3a6b3189e4 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx @@ -1,4 +1,5 @@ import { + AdjustmentsHorizontalIcon, ArrowUpCircleIcon, BookOpenIcon, ChatBubbleLeftEllipsisIcon, @@ -14,6 +15,7 @@ import type { RuntimeEnvironmentType } from "@trigger.dev/database"; import { useEffect, useState } from "react"; import { typedjson, useTypedLoaderData } from "remix-typedjson"; import { z } from "zod"; +import { RunsIcon } from "~/assets/icons/RunsIcon"; import { TaskIconSmall } from "~/assets/icons/TaskIcon"; import upgradeForQueuesPath from "~/assets/images/queues-dashboard.png"; import { AdminDebugTooltip } from "~/components/admin/debugTooltip"; @@ -23,13 +25,16 @@ import { Feedback } from "~/components/Feedback"; import { PageBody, PageContainer } from "~/components/layout/AppLayout"; import { BigNumber } from "~/components/metrics/BigNumber"; import { Badge } from "~/components/primitives/Badge"; -import { Button, type ButtonVariant, LinkButton } from "~/components/primitives/Buttons"; +import { Button, LinkButton, type ButtonVariant } from "~/components/primitives/Buttons"; import { Callout } from "~/components/primitives/Callout"; import { Dialog, DialogContent, DialogHeader, DialogTrigger } from "~/components/primitives/Dialog"; import { FormButtons } from "~/components/primitives/FormButtons"; +import { Header3 } from "~/components/primitives/Headers"; +import { Input } from "~/components/primitives/Input"; import { NavBar, PageAccessories, PageTitle } from "~/components/primitives/PageHeader"; import { PaginationControls } from "~/components/primitives/Pagination"; import { Paragraph } from "~/components/primitives/Paragraph"; +import { PopoverMenuItem } from "~/components/primitives/Popover"; import { Spinner } from "~/components/primitives/Spinner"; import { Table, @@ -48,26 +53,25 @@ import { TooltipProvider, TooltipTrigger, } from "~/components/primitives/Tooltip"; +import { env } from "~/env.server"; +import { useAutoRevalidate } from "~/hooks/useAutoRevalidate"; import { useEnvironment } from "~/hooks/useEnvironment"; import { useOrganization } from "~/hooks/useOrganizations"; import { useProject } from "~/hooks/useProject"; +import { useThrottle } from "~/hooks/useThrottle"; import { redirectWithErrorMessage, redirectWithSuccessMessage } from "~/models/message.server"; import { findProjectBySlug } from "~/models/project.server"; import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server"; +import { getUserById } from "~/models/user.server"; import { EnvironmentQueuePresenter } from "~/presenters/v3/EnvironmentQueuePresenter.server"; import { QueueListPresenter } from "~/presenters/v3/QueueListPresenter.server"; import { requireUserId } from "~/services/session.server"; import { cn } from "~/utils/cn"; import { docsPath, EnvironmentParamSchema, v3BillingPath, v3RunsPath } from "~/utils/pathBuilder"; +import { concurrencySystem } from "~/v3/services/concurrencySystemInstance.server"; import { PauseEnvironmentService } from "~/v3/services/pauseEnvironment.server"; import { PauseQueueService } from "~/v3/services/pauseQueue.server"; import { useCurrentPlan } from "../_app.orgs.$organizationSlug/route"; -import { Header3 } from "~/components/primitives/Headers"; -import { Input } from "~/components/primitives/Input"; -import { useThrottle } from "~/hooks/useThrottle"; -import { RunsIcon } from "~/assets/icons/RunsIcon"; -import { useAutoRevalidate } from "~/hooks/useAutoRevalidate"; -import { env } from "~/env.server"; const SearchParamsSchema = z.object({ query: z.string().optional(), @@ -208,6 +212,75 @@ export const action = async ({ request, params }: ActionFunctionArgs) => { `Queue ${action === "queue-pause" ? "paused" : "resumed"}` ); } + case "queue-override": { + const friendlyId = formData.get("friendlyId"); + const concurrencyLimit = formData.get("concurrencyLimit"); + + if (!friendlyId) { + return redirectWithErrorMessage(redirectPath, request, "Queue ID is required"); + } + + if (!concurrencyLimit) { + return redirectWithErrorMessage(redirectPath, request, "Concurrency limit is required"); + } + + const limitNumber = parseInt(concurrencyLimit.toString(), 10); + if (isNaN(limitNumber) || limitNumber < 0) { + return redirectWithErrorMessage( + redirectPath, + request, + "Concurrency limit must be a valid number" + ); + } + + const user = await getUserById(userId); + if (!user) { + return redirectWithErrorMessage(redirectPath, request, "User not found"); + } + + const result = await concurrencySystem.queues.overrideQueueConcurrencyLimit( + environment, + friendlyId.toString(), + limitNumber, + user + ); + + if (!result.isOk()) { + return redirectWithErrorMessage( + redirectPath, + request, + "Failed to override queue concurrency limit" + ); + } + + return redirectWithSuccessMessage( + redirectPath, + request, + "Queue concurrency limit overridden" + ); + } + case "queue-remove-override": { + const friendlyId = formData.get("friendlyId"); + + if (!friendlyId) { + return redirectWithErrorMessage(redirectPath, request, "Queue ID is required"); + } + + const result = await concurrencySystem.queues.resetConcurrencyLimit( + environment, + friendlyId.toString() + ); + + if (!result.isOk()) { + return redirectWithErrorMessage( + redirectPath, + request, + "Failed to reset queue concurrency limit" + ); + } + + return redirectWithSuccessMessage(redirectPath, request, "Queue concurrency limit reset"); + } default: return redirectWithErrorMessage(redirectPath, request, "Something went wrong"); } @@ -267,17 +340,19 @@ export default function Page() { animate accessory={
+ {environment.runsEnabled ? : null} - View runs - - {environment.runsEnabled ? : null} + tooltip="View queued runs" + />
} valueClassName={env.paused ? "text-warning" : undefined} @@ -300,15 +375,17 @@ export default function Page() { } accessory={ - View runs - + tooltip="View runs" + /> } compactThreshold={1000000} /> @@ -375,7 +452,8 @@ export default function Page() { Name Queued - Running/limit + Running + Limit +
+ Override + + This queue's concurrency limit has been manually overridden from the + dashboard or API. + +
} > @@ -415,7 +504,7 @@ export default function Page() { {queues.length > 0 ? ( queues.map((queue) => { const limit = queue.concurrencyLimit ?? environment.concurrencyLimit; - const isAtLimit = queue.running === limit; + const isAtLimit = queue.running >= limit; const queueFilterableName = `${queue.type === "task" ? "task/" : ""}${ queue.name }`; @@ -451,6 +540,18 @@ export default function Page() { {queue.name} + {queue.concurrency?.overriddenAt ? ( + + Concurrency limit overridden + + } + content="This queue's concurrency limit has been manually overridden from the dashboard or API." + className="max-w-xs" + disableHoverableContent + /> + ) : null} {queue.paused ? ( Paused @@ -465,31 +566,50 @@ export default function Page() { {queue.queued} 0 && "text-text-bright", isAtLimit && "text-warning" )} > - {queue.running}/ - - {limit} - + {queue.running} + {limit} + + - {queue.concurrencyLimit ? "User" : "Environment"} + {queue.concurrency?.overriddenAt ? ( + Override + ) : queue.concurrencyLimit ? ( + "User" + ) : ( + "Environment" + )} )} - - View all runs - - + - View queued runs - - + - View running runs - + /> + } /> @@ -568,7 +681,7 @@ export default function Page() { }) ) : ( - +
{hasFilters @@ -725,29 +838,26 @@ function QueuePauseResumeButton({ fullWidth?: boolean; showTooltip?: boolean; }) { - const navigation = useNavigation(); const [isOpen, setIsOpen] = useState(false); - const button = ( - - ); - const trigger = showTooltip ? (
- {button} + + +
@@ -759,7 +869,13 @@ function QueuePauseResumeButton({
) : ( - {button} + + + ); return ( @@ -806,6 +922,128 @@ function QueuePauseResumeButton({ ); } +function QueueOverrideConcurrencyButton({ + queue, + environmentConcurrencyLimit, +}: { + queue: { + id: string; + name: string; + concurrencyLimit: number | null; + concurrency?: { overriddenAt: Date | null }; + }; + environmentConcurrencyLimit: number; +}) { + const navigation = useNavigation(); + const [isOpen, setIsOpen] = useState(false); + const [concurrencyLimit, setConcurrencyLimit] = useState( + queue.concurrencyLimit?.toString() ?? environmentConcurrencyLimit.toString() + ); + + const isOverridden = !!queue.concurrency?.overriddenAt; + const currentLimit = queue.concurrencyLimit ?? environmentConcurrencyLimit; + + useEffect(() => { + if (navigation.state === "loading" || navigation.state === "idle") { + setIsOpen(false); + } + }, [navigation.state]); + + const isLoading = Boolean( + navigation.formData?.get("action") === "queue-override" || + navigation.formData?.get("action") === "queue-remove-override" + ); + + return ( + + + + + + + {isOverridden ? "Edit concurrency override" : "Override concurrency limit"} + +
+ {isOverridden ? ( + + This queue's concurrency limit is currently overridden to {currentLimit}. + {queue.concurrencyLimit !== null && + ` The original limit set in code was ${queue.concurrencyLimit}.`}{" "} + You can update the override or remove it to restore the{" "} + {queue.concurrencyLimit !== null + ? "limit set in code" + : "environment concurrency limit"} + . + + ) : ( + + Override this queue's concurrency limit. The current limit is {currentLimit}, which is + set {queue.concurrencyLimit !== null ? "in code" : "by the environment"}. + + )} +
setIsOpen(false)} className="space-y-3"> + +
+ + setConcurrencyLimit(e.target.value)} + placeholder={currentLimit.toString()} + autoFocus + /> +
+ + } + shortcut={{ modifiers: ["mod"], key: "enter" }} + > + {isOverridden ? "Update override" : "Override limit"} + + } + cancelButton={ +
+ {isOverridden && ( + + )} + + + +
+ } + /> + +
+
+
+ ); +} + function EngineVersionUpgradeCallout() { return (
diff --git a/apps/webapp/app/routes/api.v1.queues.$queueParam.concurrency.override.ts b/apps/webapp/app/routes/api.v1.queues.$queueParam.concurrency.override.ts new file mode 100644 index 0000000000..3223a2a606 --- /dev/null +++ b/apps/webapp/app/routes/api.v1.queues.$queueParam.concurrency.override.ts @@ -0,0 +1,75 @@ +import { json } from "@remix-run/server-runtime"; +import { type RetrieveQueueParam, RetrieveQueueType } from "@trigger.dev/core/v3"; +import { z } from "zod"; +import { toQueueItem } from "~/presenters/v3/QueueRetrievePresenter.server"; +import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; +import { concurrencySystem } from "~/v3/services/concurrencySystemInstance.server"; + +const BodySchema = z.object({ + type: RetrieveQueueType.default("id"), + concurrencyLimit: z.number().int().min(0).max(100000), +}); + +export const { action } = createActionApiRoute( + { + body: BodySchema, + params: z.object({ + queueParam: z.string().transform((val) => val.replace(/%2F/g, "/")), + }), + }, + async ({ params, body, authentication }) => { + const input: RetrieveQueueParam = + body.type === "id" + ? params.queueParam + : { + type: body.type, + name: decodeURIComponent(params.queueParam).replace(/%2F/g, "/"), + }; + + return concurrencySystem.queues + .overrideQueueConcurrencyLimit(authentication.environment, input, body.concurrencyLimit) + .match( + (queue) => { + return json( + toQueueItem({ + friendlyId: queue.friendlyId, + name: queue.name, + type: queue.type, + running: queue.running, + queued: queue.queued, + concurrencyLimit: queue.concurrencyLimit, + concurrencyLimitBase: queue.concurrencyLimitBase, + concurrencyLimitOverriddenAt: queue.concurrencyLimitOverriddenAt, + concurrencyLimitOverriddenBy: null, + paused: queue.paused, + }), + { status: 200 } + ); + }, + (error) => { + switch (error.type) { + case "queue_not_found": { + return json({ error: "Queue not found" }, { status: 404 }); + } + case "queue_update_failed": { + return json({ error: "Failed to update queue concurrency limit" }, { status: 500 }); + } + case "sync_queue_concurrency_to_engine_failed": { + return json( + { error: "Failed to sync queue concurrency limit to engine" }, + { status: 500 } + ); + } + case "get_queue_stats_failed": { + return json({ error: "Failed to get queue stats" }, { status: 500 }); + } + case "other": + default: { + error.type satisfies "other"; + return json({ error: "Internal server error" }, { status: 500 }); + } + } + } + ); + } +); diff --git a/apps/webapp/app/routes/api.v1.queues.$queueParam.concurrency.reset.ts b/apps/webapp/app/routes/api.v1.queues.$queueParam.concurrency.reset.ts new file mode 100644 index 0000000000..dbeea591ad --- /dev/null +++ b/apps/webapp/app/routes/api.v1.queues.$queueParam.concurrency.reset.ts @@ -0,0 +1,75 @@ +import { json } from "@remix-run/server-runtime"; +import { type RetrieveQueueParam, RetrieveQueueType } from "@trigger.dev/core/v3"; +import { z } from "zod"; +import { toQueueItem } from "~/presenters/v3/QueueRetrievePresenter.server"; +import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; +import { concurrencySystem } from "~/v3/services/concurrencySystemInstance.server"; + +const BodySchema = z.object({ + type: RetrieveQueueType.default("id"), +}); + +export const { action } = createActionApiRoute( + { + body: BodySchema, + params: z.object({ + queueParam: z.string().transform((val) => val.replace(/%2F/g, "/")), + }), + }, + async ({ params, body, authentication }) => { + const input: RetrieveQueueParam = + body.type === "id" + ? params.queueParam + : { + type: body.type, + name: decodeURIComponent(params.queueParam).replace(/%2F/g, "/"), + }; + + return concurrencySystem.queues.resetConcurrencyLimit(authentication.environment, input).match( + (queue) => { + return json( + toQueueItem({ + friendlyId: queue.friendlyId, + name: queue.name, + type: queue.type, + running: queue.running, + queued: queue.queued, + concurrencyLimit: queue.concurrencyLimit, + concurrencyLimitBase: queue.concurrencyLimitBase, + concurrencyLimitOverriddenAt: queue.concurrencyLimitOverriddenAt, + concurrencyLimitOverriddenBy: null, + paused: queue.paused, + }), + { status: 200 } + ); + }, + (error) => { + switch (error.type) { + case "queue_not_found": { + return json({ error: "Queue not found" }, { status: 404 }); + } + case "queue_not_overridden": { + return json({ error: "Queue is not overridden" }, { status: 400 }); + } + case "queue_update_failed": { + return json({ error: "Failed to update queue concurrency limit" }, { status: 500 }); + } + case "sync_queue_concurrency_to_engine_failed": { + return json( + { error: "Failed to sync queue concurrency limit to engine" }, + { status: 500 } + ); + } + case "get_queue_stats_failed": { + return json({ error: "Failed to get queue stats" }, { status: 500 }); + } + case "other": + default: { + error.type satisfies "other"; + return json({ error: "Internal server error" }, { status: 500 }); + } + } + } + ); + } +); diff --git a/apps/webapp/app/v3/services/concurrencySystem.server.ts b/apps/webapp/app/v3/services/concurrencySystem.server.ts new file mode 100644 index 0000000000..8ef33e4792 --- /dev/null +++ b/apps/webapp/app/v3/services/concurrencySystem.server.ts @@ -0,0 +1,209 @@ +import { TaskQueue, User } from "@trigger.dev/database"; +import { errAsync, fromPromise, okAsync } from "neverthrow"; +import { PrismaClientOrTransaction } from "~/db.server"; +import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; +import { removeQueueConcurrencyLimits, updateQueueConcurrencyLimits } from "../runQueue.server"; +import { engine } from "../runEngine.server"; + +export type ConcurrencySystemOptions = { + db: PrismaClientOrTransaction; + reader: PrismaClientOrTransaction; +}; + +export type QueueInput = string | { type: "task" | "custom"; name: string }; + +export class ConcurrencySystem { + constructor(private readonly options: ConcurrencySystemOptions) {} + + private get db() { + return this.options.db; + } + + get queues() { + return { + overrideQueueConcurrencyLimit: ( + environment: AuthenticatedEnvironment, + queue: QueueInput, + concurrencyLimit: number, + overriddenBy?: User + ) => { + return findQueueFromInput(this.db, environment, queue) + .andThen((queue) => + overrideQueueConcurrencyLimit( + this.db, + environment, + queue, + concurrencyLimit, + overriddenBy + ) + ) + .andThen((queue) => syncQueueConcurrencyToEngine(environment, queue)) + .andThen((queue) => getQueueStats(environment, queue)); + }, + resetConcurrencyLimit: (environment: AuthenticatedEnvironment, queue: QueueInput) => { + return findQueueFromInput(this.db, environment, queue) + .andThen((queue) => resetQueueConcurrencyLimit(this.db, queue)) + .andThen((queue) => syncQueueConcurrencyToEngine(environment, queue)) + .andThen((queue) => getQueueStats(environment, queue)); + }, + }; + } +} + +function findQueueFromInput( + db: PrismaClientOrTransaction, + environment: AuthenticatedEnvironment, + queue: QueueInput +) { + if (typeof queue === "string") { + return findQueueByFriendlyId(db, environment, queue); + } + + const queueName = + queue.type === "task" ? `task/${queue.name.replace(/^task\//, "")}` : queue.name; + + return findQueueByName(db, environment, queueName); +} + +function findQueueByFriendlyId( + db: PrismaClientOrTransaction, + environment: AuthenticatedEnvironment, + friendlyId: string +) { + return fromPromise( + db.taskQueue.findFirst({ + where: { + runtimeEnvironmentId: environment.id, + friendlyId, + }, + }), + (error) => ({ + type: "other" as const, + cause: error, + }) + ).andThen((queue) => { + if (!queue) { + return errAsync({ type: "queue_not_found" as const }); + } + return okAsync(queue); + }); +} + +function findQueueByName( + db: PrismaClientOrTransaction, + environment: AuthenticatedEnvironment, + queue: string +) { + return fromPromise( + db.taskQueue.findFirst({ + where: { + runtimeEnvironmentId: environment.id, + name: queue, + }, + }), + (error) => ({ + type: "other" as const, + cause: error, + }) + ).andThen((queue) => { + if (!queue) { + return errAsync({ type: "queue_not_found" as const }); + } + return okAsync(queue); + }); +} + +function overrideQueueConcurrencyLimit( + db: PrismaClientOrTransaction, + environment: AuthenticatedEnvironment, + queue: TaskQueue, + concurrencyLimit: number, + overriddenBy?: User +) { + const newConcurrencyLimit = Math.max( + Math.min( + concurrencyLimit, + environment.maximumConcurrencyLimit, + environment.organization.maximumConcurrencyLimit + ), + 0 + ); + + const concurrencyLimitBase = queue.concurrencyLimitOverriddenAt + ? queue.concurrencyLimitBase + : queue.concurrencyLimit; + + return fromPromise( + db.taskQueue.update({ + where: { + id: queue.id, + }, + data: { + concurrencyLimit: newConcurrencyLimit, + concurrencyLimitBase: concurrencyLimitBase ?? null, + concurrencyLimitOverriddenAt: new Date(), + concurrencyLimitOverriddenBy: overriddenBy?.id ?? null, + }, + }), + (error) => ({ + type: "queue_update_failed" as const, + cause: error, + }) + ); +} + +function resetQueueConcurrencyLimit(db: PrismaClientOrTransaction, queue: TaskQueue) { + if (queue.concurrencyLimitOverriddenAt === null) { + return errAsync({ type: "queue_not_overridden" as const }); + } + + const newConcurrencyLimit = queue.concurrencyLimitBase; + + return fromPromise( + db.taskQueue.update({ + where: { id: queue.id }, + data: { + concurrencyLimitOverriddenAt: null, + concurrencyLimit: newConcurrencyLimit, + concurrencyLimitBase: null, + concurrencyLimitOverriddenBy: null, + }, + }), + (error) => ({ + type: "queue_update_failed" as const, + cause: error, + }) + ); +} + +function syncQueueConcurrencyToEngine(environment: AuthenticatedEnvironment, queue: TaskQueue) { + if (typeof queue.concurrencyLimit === "number") { + return fromPromise( + updateQueueConcurrencyLimits(environment, queue.name, queue.concurrencyLimit), + (error) => ({ + type: "sync_queue_concurrency_to_engine_failed" as const, + cause: error, + }) + ).andThen(() => okAsync(queue)); + } else { + return fromPromise(removeQueueConcurrencyLimits(environment, queue.name), (error) => ({ + type: "sync_queue_concurrency_to_engine_failed" as const, + cause: error, + })).andThen(() => okAsync(queue)); + } +} + +function getQueueStats(environment: AuthenticatedEnvironment, queue: TaskQueue) { + return fromPromise( + Promise.all([ + engine.lengthOfQueues(environment, [queue.name]), + engine.currentConcurrencyOfQueues(environment, [queue.name]), + ]), + (error) => ({ + type: "get_queue_stats_failed" as const, + cause: error, + }) + ).andThen(([queued, running]) => + okAsync({ queued: queued[queue.name] ?? 0, running: running[queue.name] ?? 0, ...queue }) + ); +} diff --git a/apps/webapp/app/v3/services/concurrencySystemInstance.server.ts b/apps/webapp/app/v3/services/concurrencySystemInstance.server.ts new file mode 100644 index 0000000000..fec48f2750 --- /dev/null +++ b/apps/webapp/app/v3/services/concurrencySystemInstance.server.ts @@ -0,0 +1,15 @@ +import { prisma, $replica } from "~/db.server"; +import { ConcurrencySystem } from "./concurrencySystem.server"; +import { singleton } from "~/utils/singleton"; + +export const concurrencySystem = singleton( + "concurrency-system", + initalizeConcurrencySystemInstance +); + +function initalizeConcurrencySystemInstance() { + return new ConcurrencySystem({ + db: prisma, + reader: $replica, + }); +} diff --git a/apps/webapp/app/v3/services/pauseQueue.server.ts b/apps/webapp/app/v3/services/pauseQueue.server.ts index f4e18eab4b..b5ba5f399a 100644 --- a/apps/webapp/app/v3/services/pauseQueue.server.ts +++ b/apps/webapp/app/v3/services/pauseQueue.server.ts @@ -88,6 +88,9 @@ export class PauseQueueService extends BaseService { running: results[1]?.[updatedQueue.name] ?? 0, queued: results[0]?.[updatedQueue.name] ?? 0, concurrencyLimit: updatedQueue.concurrencyLimit ?? null, + concurrencyLimitBase: updatedQueue.concurrencyLimitBase ?? null, + concurrencyLimitOverriddenAt: updatedQueue.concurrencyLimitOverriddenAt ?? null, + concurrencyLimitOverriddenBy: queue.concurrencyLimitOverriddenBy ?? null, paused: updatedQueue.paused, }), }; diff --git a/internal-packages/database/prisma/migrations/20251009140053_add_task_queue_concurrency_limit_base_column/migration.sql b/internal-packages/database/prisma/migrations/20251009140053_add_task_queue_concurrency_limit_base_column/migration.sql new file mode 100644 index 0000000000..825ef02b8d --- /dev/null +++ b/internal-packages/database/prisma/migrations/20251009140053_add_task_queue_concurrency_limit_base_column/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "public"."TaskQueue" ADD COLUMN "concurrencyLimitBase" INTEGER; \ No newline at end of file diff --git a/internal-packages/database/prisma/migrations/20251010102348_add_concurrency_limit_overriden_at_to_task_queue/migration.sql b/internal-packages/database/prisma/migrations/20251010102348_add_concurrency_limit_overriden_at_to_task_queue/migration.sql new file mode 100644 index 0000000000..13e5af08f5 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20251010102348_add_concurrency_limit_overriden_at_to_task_queue/migration.sql @@ -0,0 +1,3 @@ +-- AlterTable +ALTER TABLE "public"."TaskQueue" ADD COLUMN "concurrencyLimitOverriddenAt" TIMESTAMP(3); + diff --git a/internal-packages/database/prisma/migrations/20251010114444_add_concurrency_limit_overridden_by_to_task_queue/migration.sql b/internal-packages/database/prisma/migrations/20251010114444_add_concurrency_limit_overridden_by_to_task_queue/migration.sql new file mode 100644 index 0000000000..d8a81ce175 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20251010114444_add_concurrency_limit_overridden_by_to_task_queue/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "public"."TaskQueue" ADD COLUMN "concurrencyLimitOverriddenBy" TEXT; diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index af7cc70fe2..105dff4bef 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -1482,8 +1482,15 @@ model TaskQueue { runtimeEnvironment RuntimeEnvironment @relation(fields: [runtimeEnvironmentId], references: [id], onDelete: Cascade, onUpdate: Cascade) runtimeEnvironmentId String - concurrencyLimit Int? - rateLimit Json? + /// Represents the current concurrency limit for the queue + concurrencyLimit Int? + /// When the concurrency limit was overridden + concurrencyLimitOverriddenAt DateTime? + /// Who overrode the concurrency limit (will be null if overridden via the API) + concurrencyLimitOverriddenBy String? + /// If concurrencyLimit is overridden, this is the overridden value + concurrencyLimitBase Int? + rateLimit Json? paused Boolean @default(false) diff --git a/packages/core/src/v3/apiClient/index.ts b/packages/core/src/v3/apiClient/index.ts index fdd4bfc5e5..7264faa148 100644 --- a/packages/core/src/v3/apiClient/index.ts +++ b/packages/core/src/v3/apiClient/index.ts @@ -899,6 +899,53 @@ export class ApiClient { ); } + overrideQueueConcurrencyLimit( + queue: RetrieveQueueParam, + concurrencyLimit: number, + requestOptions?: ZodFetchOptions + ) { + const type = typeof queue === "string" ? "id" : queue.type; + const value = typeof queue === "string" ? queue : queue.name; + + // Explicitly encode slashes before encoding the rest of the string + const encodedValue = encodeURIComponent(value.replace(/\//g, "%2F")); + + return zodfetch( + QueueItem, + `${this.baseUrl}/api/v1/queues/${encodedValue}/concurrency/override`, + { + method: "POST", + headers: this.#getHeaders(false), + body: JSON.stringify({ + type, + concurrencyLimit, + }), + }, + mergeRequestOptions(this.defaultRequestOptions, requestOptions) + ); + } + + resetQueueConcurrencyLimit(queue: RetrieveQueueParam, requestOptions?: ZodFetchOptions) { + const type = typeof queue === "string" ? "id" : queue.type; + const value = typeof queue === "string" ? queue : queue.name; + + // Explicitly encode slashes before encoding the rest of the string + const encodedValue = encodeURIComponent(value.replace(/\//g, "%2F")); + + return zodfetch( + QueueItem, + `${this.baseUrl}/api/v1/queues/${encodedValue}/concurrency/reset`, + { + method: "POST", + headers: this.#getHeaders(false), + body: JSON.stringify({ + type, + }), + }, + mergeRequestOptions(this.defaultRequestOptions, requestOptions) + ); + } + subscribeToRun( runId: string, options?: { diff --git a/packages/core/src/v3/schemas/queues.ts b/packages/core/src/v3/schemas/queues.ts index cf42f0ed9a..34a47b34e3 100644 --- a/packages/core/src/v3/schemas/queues.ts +++ b/packages/core/src/v3/schemas/queues.ts @@ -28,10 +28,25 @@ export const QueueItem = z.object({ running: z.number(), /** The number of runs currently queued */ queued: z.number(), - /** The concurrency limit of the queue */ - concurrencyLimit: z.number().nullable(), /** Whether the queue is paused. If it's paused, no new runs will be started. */ paused: z.boolean(), + /** The concurrency limit of the queue */ + concurrencyLimit: z.number().nullable(), + /** The concurrency limit of the queue */ + concurrency: z + .object({ + /** The effective/current concurrency limit */ + current: z.number().nullable(), + /** The base concurrency limit (default) */ + base: z.number().nullable(), + /** The effective/current concurrency limit */ + override: z.number().nullable(), + /** When the override was applied */ + overriddenAt: z.coerce.date().nullable(), + /** Who overrode the concurrency limit (will be null if overridden via the API) */ + overriddenBy: z.string().nullable(), + }) + .optional(), }); export type QueueItem = z.infer; diff --git a/packages/trigger-sdk/src/v3/queues.ts b/packages/trigger-sdk/src/v3/queues.ts index 6788186f9d..f05a81ab68 100644 --- a/packages/trigger-sdk/src/v3/queues.ts +++ b/packages/trigger-sdk/src/v3/queues.ts @@ -132,6 +132,80 @@ export function pause( return apiClient.pauseQueue(queue, "pause", $requestOptions); } +/** + * Overrides the concurrency limit of a queue. + * + * @param queue - The ID of the queue to override the concurrency limit, or the type and name + * @param concurrencyLimit - The concurrency limit to override + * @returns The updated queue state + */ +export function overrideConcurrencyLimit( + queue: RetrieveQueueParam, + concurrencyLimit: number, + requestOptions?: ApiRequestOptions +): ApiPromise { + const apiClient = apiClientManager.clientOrThrow(); + + const $requestOptions = mergeRequestOptions( + { + tracer, + name: "queues.overrideConcurrencyLimit()", + icon: "queue", + attributes: { + ...flattenAttributes({ queue }), + ...accessoryAttributes({ + items: [ + { + text: typeof queue === "string" ? queue : queue.name, + variant: "normal", + }, + ], + style: "codepath", + }), + }, + }, + requestOptions + ); + + return apiClient.overrideQueueConcurrencyLimit(queue, concurrencyLimit, $requestOptions); +} + +/** + * Resets the concurrency limit of a queue to the base value. + * + * @param queue - The ID of the queue to reset the concurrency limit, or the type and name + * @returns The updated queue state + */ +export function resetConcurrencyLimit( + queue: RetrieveQueueParam, + requestOptions?: ApiRequestOptions +): ApiPromise { + const apiClient = apiClientManager.clientOrThrow(); + + const $requestOptions = mergeRequestOptions( + { + tracer, + name: "queues.resetConcurrencyLimit()", + icon: "queue", + attributes: { + ...flattenAttributes({ queue }), + ...accessoryAttributes({ + items: [ + { + text: typeof queue === "string" ? queue : queue.name, + variant: "normal", + }, + ], + style: "codepath", + }), + }, + }, + requestOptions + ); + + return apiClient.resetQueueConcurrencyLimit(queue, $requestOptions); +} + /** * Resumes a paused queue, allowing new runs to be started. * diff --git a/references/hello-world/src/trigger/queues.ts b/references/hello-world/src/trigger/queues.ts index 7eb05c99b4..69c4216b97 100644 --- a/references/hello-world/src/trigger/queues.ts +++ b/references/hello-world/src/trigger/queues.ts @@ -1,4 +1,4 @@ -import { batch, logger, queue, queues, task, tasks } from "@trigger.dev/sdk/v3"; +import { batch, logger, queue, queues, runs, task, tasks } from "@trigger.dev/sdk/v3"; export const queuesTester = task({ id: "queues-tester", @@ -37,6 +37,22 @@ export const queuesTester = task({ name: "queues-tester", }); logger.log("Resumed queue", { resumedQueue }); + + const overriddenQueue = await queues.overrideConcurrencyLimit( + { + type: "task", + name: "queues-tester", + }, + 10 + ); + logger.log("Overridden queue", { overriddenQueue }); + + const resetQueue = await queues.resetConcurrencyLimit({ + type: "task", + name: "queues-tester", + }); + + logger.log("Reset queue", { resetQueue }); }, }); @@ -262,3 +278,56 @@ export const lockedTaskIdentifierTask = task({ await tasks.triggerAndWait("task_does_not_exist", {}); }, }); + +export const testQueuesConcurrencyOverrideTask = task({ + id: "test-queues-concurrency-override-task", + run: async (payload: any) => { + logger.log("Test queues concurrency override task", { payload }); + + const handle1 = await testQueuesConcurrencyOverrideChildTask.trigger({}); + const handle2 = await testQueuesConcurrencyOverrideChildTask.trigger({}); + + const result1 = await runs.poll(handle1.id); + const result2 = await runs.poll(handle2.id); + + logger.log("Test queues concurrency override task results", { result1, result2 }); + + const handle3 = await testQueuesConcurrencyOverrideChildTask.trigger({}); + const handle4 = await testQueuesConcurrencyOverrideChildTask.trigger({}); + + const overriddenQueue = await queues.overrideConcurrencyLimit( + { + type: "custom", + name: "test-queues-concurrency-override-queue", + }, + 2 + ); + + logger.log("Overridden queue", { overriddenQueue }); + + const result3 = await runs.poll(handle3.id); + const result4 = await runs.poll(handle4.id); + + logger.log("Test queues concurrency override task results", { result3, result4 }); + + const resetQueue = await queues.resetConcurrencyLimit({ + type: "custom", + name: "test-queues-concurrency-override-queue", + }); + + logger.log("Reset queue", { resetQueue }); + }, +}); + +export const testQueuesConcurrencyOverrideChildTask = task({ + id: "test-queues-concurrency-override-child-task", + queue: { + name: "test-queues-concurrency-override-queue", + concurrencyLimit: 1, + }, + run: async (payload: any) => { + logger.log("Test queues concurrency override child task", { payload }); + + await setTimeout(10000); + }, +});