Skip to content

Commit a66c1e3

Browse files
committed
feat(logs): added log archive
1 parent 4bbc2b2 commit a66c1e3

File tree

1 file changed

+104
-24
lines changed

1 file changed

+104
-24
lines changed
Lines changed: 104 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,21 @@
11
import { NextResponse } from 'next/server'
2-
import { sql } from 'drizzle-orm'
2+
import { PutObjectCommand } from '@aws-sdk/client-s3'
3+
import { and, eq, inArray, lt, sql } from 'drizzle-orm'
34
import { createLogger } from '@/lib/logs/console-logger'
5+
import { s3Client } from '@/lib/uploads/s3-client'
46
import { db } from '@/db'
5-
import { subscription, user, workflowLogs } from '@/db/schema'
7+
import { subscription, user, workflow, workflowLogs } from '@/db/schema'
8+
9+
export const dynamic = 'force-dynamic'
610

711
const logger = createLogger('LogsCleanup')
812

13+
const BATCH_SIZE = 500
14+
const S3_CONFIG = {
15+
bucket: process.env.S3_LOGS_BUCKET_NAME || '',
16+
region: process.env.AWS_REGION || '',
17+
}
18+
919
export async function POST(request: Request) {
1020
try {
1121
const authHeader = request.headers.get('authorization')
@@ -19,9 +29,13 @@ export async function POST(request: Request) {
1929
return new NextResponse('Unauthorized', { status: 401 })
2030
}
2131

32+
if (!S3_CONFIG.bucket || !S3_CONFIG.region) {
33+
return new NextResponse('Configuration error: S3 bucket or region not set', { status: 500 })
34+
}
35+
2236
const retentionDate = new Date()
2337
retentionDate.setDate(
24-
retentionDate.getDate() - Number(process.env.FREE_PLAN_LOG_RETENTION_DAYS)
38+
retentionDate.getDate() - Number(process.env.FREE_PLAN_LOG_RETENTION_DAYS || '7')
2539
)
2640

2741
const freeUsers = await db
@@ -39,39 +53,105 @@ export async function POST(request: Request) {
3953
}
4054

4155
const freeUserIds = freeUsers.map((u) => u.userId)
42-
logger.info(`Found ${freeUserIds.length} free users for log cleanup`)
4356

44-
const freeUserWorkflows = await db
45-
.select({ workflowId: workflowLogs.workflowId })
57+
const workflowsQuery = await db
58+
.select({ id: workflow.id })
59+
.from(workflow)
60+
.where(inArray(workflow.userId, freeUserIds))
61+
62+
if (workflowsQuery.length === 0) {
63+
logger.info('No workflows found for free users')
64+
return NextResponse.json({ message: 'No workflows found for cleanup' })
65+
}
66+
67+
const workflowIds = workflowsQuery.map((w) => w.id)
68+
69+
const oldLogs = await db
70+
.select({
71+
id: workflowLogs.id,
72+
workflowId: workflowLogs.workflowId,
73+
executionId: workflowLogs.executionId,
74+
level: workflowLogs.level,
75+
message: workflowLogs.message,
76+
duration: workflowLogs.duration,
77+
trigger: workflowLogs.trigger,
78+
createdAt: workflowLogs.createdAt,
79+
metadata: workflowLogs.metadata,
80+
})
4681
.from(workflowLogs)
47-
.innerJoin(
48-
sql`workflow`,
49-
sql`${workflowLogs.workflowId} = workflow.id AND workflow.user_id IN (${sql.join(freeUserIds)})`
82+
.where(
83+
and(
84+
inArray(workflowLogs.workflowId, workflowIds),
85+
lt(workflowLogs.createdAt, retentionDate)
86+
)
5087
)
51-
.groupBy(workflowLogs.workflowId)
88+
.limit(BATCH_SIZE)
5289

53-
if (freeUserWorkflows.length === 0) {
54-
logger.info('No free user workflows found for log cleanup')
90+
logger.info(`Found ${oldLogs.length} logs older than ${retentionDate.toISOString()} to archive`)
91+
92+
if (oldLogs.length === 0) {
5593
return NextResponse.json({ message: 'No logs to clean up' })
5694
}
5795

58-
const workflowIds = freeUserWorkflows.map((w) => w.workflowId)
96+
const results = {
97+
total: oldLogs.length,
98+
archived: 0,
99+
archiveFailed: 0,
100+
deleted: 0,
101+
deleteFailed: 0,
102+
}
59103

60-
const result = await db
61-
.delete(workflowLogs)
62-
.where(
63-
sql`${workflowLogs.workflowId} IN (${sql.join(workflowIds)}) AND ${workflowLogs.createdAt} < ${retentionDate}`
64-
)
65-
.returning({ id: workflowLogs.id })
104+
for (const log of oldLogs) {
105+
const today = new Date().toISOString().split('T')[0]
106+
107+
const logKey = `archived-logs/${today}/${log.id}.json`
108+
const logData = JSON.stringify(log)
66109

67-
logger.info(`Successfully cleaned up ${result.length} logs for free users`)
110+
try {
111+
await s3Client.send(
112+
new PutObjectCommand({
113+
Bucket: S3_CONFIG.bucket,
114+
Key: logKey,
115+
Body: logData,
116+
ContentType: 'application/json',
117+
Metadata: {
118+
logId: String(log.id),
119+
workflowId: String(log.workflowId),
120+
archivedAt: new Date().toISOString(),
121+
},
122+
})
123+
)
124+
125+
results.archived++
126+
127+
try {
128+
const deleteResult = await db
129+
.delete(workflowLogs)
130+
.where(eq(workflowLogs.id, log.id))
131+
.returning({ id: workflowLogs.id })
132+
133+
if (deleteResult.length > 0) {
134+
results.deleted++
135+
} else {
136+
results.deleteFailed++
137+
logger.warn(`Failed to delete log ${log.id} after archiving: No rows deleted`)
138+
}
139+
} catch (deleteError) {
140+
results.deleteFailed++
141+
logger.error(`Error deleting log ${log.id} after archiving:`, { deleteError })
142+
}
143+
} catch (archiveError) {
144+
results.archiveFailed++
145+
logger.error(`Failed to archive log ${log.id}:`, { archiveError })
146+
}
147+
}
68148

69149
return NextResponse.json({
70-
message: `Successfully cleaned up ${result.length} logs for free users`,
71-
deletedCount: result.length,
150+
message: `Successfully processed ${results.total} logs: archived ${results.archived}, deleted ${results.deleted}`,
151+
results,
72152
})
73153
} catch (error) {
74-
logger.error('Error cleaning up logs:', { error })
75-
return NextResponse.json({ error: 'Failed to clean up logs' }, { status: 500 })
154+
logger.error('Error in log cleanup process:', { error })
155+
return NextResponse.json({ error: 'Failed to process log cleanup' }, { status: 500 })
76156
}
77157
}

0 commit comments

Comments
 (0)