Skip to content

Commit b05a9b1

Browse files
icecrasher321waleedlatif1emir-karabeg
authored
feat(execution-queuing): async api mode + ratelimiting by subscription tier (#702)
* v1 queuing system * working async queue * working impl of sync + async request formats * fix tests * fix rate limit calc * fix rate limiting issues * regen migration * fix test * fix instrumentation script issues * remove use workflow queue env var * make modal have async examples * Remove conflicting 54th migration before merging staging * new migration files * remove console log * update modal correctly * working sync executor * works for sync * remove useless stats endpoint * fix tests * add sync exec timeout * working impl with cron job * migrate to trigger.dev * remove migration * remove unused code * update readme * restructure jobs API response * add logging for async execs * improvement: example ui/ux * use getBaseUrl() func --------- Co-authored-by: Waleed Latif <walif6@gmail.com> Co-authored-by: Emir Karabeg <emirkarabeg@berkeley.edu>
1 parent 11264ed commit b05a9b1

File tree

35 files changed

+7879
-568
lines changed

35 files changed

+7879
-568
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ bun run dev:sockets
147147
- **Docs**: [Fumadocs](https://fumadocs.vercel.app/)
148148
- **Monorepo**: [Turborepo](https://turborepo.org/)
149149
- **Realtime**: [Socket.io](https://socket.io/)
150+
- **Background Jobs**: [Trigger.dev](https://trigger.dev/)
150151

151152
## Contributing
152153

apps/sim/.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,5 @@ next-env.d.ts
5050

5151
# Uploads
5252
/uploads
53+
54+
.trigger

apps/sim/app/api/chat/utils.test.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,38 @@ import type { NextResponse } from 'next/server'
77
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
88
import { env } from '@/lib/env'
99

10+
// Mock all the problematic imports that cause timeouts
11+
vi.mock('@/db', () => ({
12+
db: {
13+
select: vi.fn(),
14+
update: vi.fn(),
15+
},
16+
}))
17+
18+
vi.mock('@/lib/utils', () => ({
19+
decryptSecret: vi.fn().mockResolvedValue({ decrypted: 'test-secret' }),
20+
}))
21+
22+
vi.mock('@/lib/logs/enhanced-logging-session', () => ({
23+
EnhancedLoggingSession: vi.fn().mockImplementation(() => ({
24+
safeStart: vi.fn().mockResolvedValue(undefined),
25+
safeComplete: vi.fn().mockResolvedValue(undefined),
26+
safeCompleteWithError: vi.fn().mockResolvedValue(undefined),
27+
})),
28+
}))
29+
30+
vi.mock('@/executor', () => ({
31+
Executor: vi.fn(),
32+
}))
33+
34+
vi.mock('@/serializer', () => ({
35+
Serializer: vi.fn(),
36+
}))
37+
38+
vi.mock('@/stores/workflows/server-utils', () => ({
39+
mergeSubblockState: vi.fn().mockReturnValue({}),
40+
}))
41+
1042
describe('Chat API Utils', () => {
1143
beforeEach(() => {
1244
vi.resetModules()
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
import { runs } from '@trigger.dev/sdk/v3'
2+
import { eq } from 'drizzle-orm'
3+
import { type NextRequest, NextResponse } from 'next/server'
4+
import { getSession } from '@/lib/auth'
5+
import { createLogger } from '@/lib/logs/console-logger'
6+
import { db } from '@/db'
7+
import { apiKey as apiKeyTable } from '@/db/schema'
8+
import { createErrorResponse } from '../../workflows/utils'
9+
10+
const logger = createLogger('TaskStatusAPI')
11+
12+
export async function GET(
13+
request: NextRequest,
14+
{ params }: { params: Promise<{ jobId: string }> }
15+
) {
16+
const { jobId: taskId } = await params
17+
const requestId = crypto.randomUUID().slice(0, 8)
18+
19+
try {
20+
logger.debug(`[${requestId}] Getting status for task: ${taskId}`)
21+
22+
// Try session auth first (for web UI)
23+
const session = await getSession()
24+
let authenticatedUserId: string | null = session?.user?.id || null
25+
26+
if (!authenticatedUserId) {
27+
const apiKeyHeader = request.headers.get('x-api-key')
28+
if (apiKeyHeader) {
29+
const [apiKeyRecord] = await db
30+
.select({ userId: apiKeyTable.userId })
31+
.from(apiKeyTable)
32+
.where(eq(apiKeyTable.key, apiKeyHeader))
33+
.limit(1)
34+
35+
if (apiKeyRecord) {
36+
authenticatedUserId = apiKeyRecord.userId
37+
}
38+
}
39+
}
40+
41+
if (!authenticatedUserId) {
42+
return createErrorResponse('Authentication required', 401)
43+
}
44+
45+
// Fetch task status from Trigger.dev
46+
const run = await runs.retrieve(taskId)
47+
48+
logger.debug(`[${requestId}] Task ${taskId} status: ${run.status}`)
49+
50+
// Map Trigger.dev status to our format
51+
const statusMap = {
52+
QUEUED: 'queued',
53+
WAITING_FOR_DEPLOY: 'queued',
54+
EXECUTING: 'processing',
55+
RESCHEDULED: 'processing',
56+
FROZEN: 'processing',
57+
COMPLETED: 'completed',
58+
CANCELED: 'cancelled',
59+
FAILED: 'failed',
60+
CRASHED: 'failed',
61+
INTERRUPTED: 'failed',
62+
SYSTEM_FAILURE: 'failed',
63+
EXPIRED: 'failed',
64+
} as const
65+
66+
const mappedStatus = statusMap[run.status as keyof typeof statusMap] || 'unknown'
67+
68+
// Build response based on status
69+
const response: any = {
70+
success: true,
71+
taskId,
72+
status: mappedStatus,
73+
metadata: {
74+
startedAt: run.startedAt,
75+
},
76+
}
77+
78+
// Add completion details if finished
79+
if (mappedStatus === 'completed') {
80+
response.output = run.output // This contains the workflow execution results
81+
response.metadata.completedAt = run.finishedAt
82+
response.metadata.duration = run.durationMs
83+
}
84+
85+
// Add error details if failed
86+
if (mappedStatus === 'failed') {
87+
response.error = run.error
88+
response.metadata.completedAt = run.finishedAt
89+
response.metadata.duration = run.durationMs
90+
}
91+
92+
// Add progress info if still processing
93+
if (mappedStatus === 'processing' || mappedStatus === 'queued') {
94+
response.estimatedDuration = 180000 // 3 minutes max from our config
95+
}
96+
97+
return NextResponse.json(response)
98+
} catch (error: any) {
99+
logger.error(`[${requestId}] Error fetching task status:`, error)
100+
101+
if (error.message?.includes('not found') || error.status === 404) {
102+
return createErrorResponse('Task not found', 404)
103+
}
104+
105+
return createErrorResponse('Failed to fetch task status', 500)
106+
}
107+
}
108+
109+
// TODO: Implement task cancellation via Trigger.dev API if needed
110+
// export async function DELETE() { ... }

0 commit comments

Comments
 (0)