Skip to content

Commit dea761f

Browse files
committed
Retry completeBatchTaskRunItem if they fail because of a retriable Prisma error
1 parent 1ba00b1 commit dea761f

File tree

1 file changed

+25
-0
lines changed

1 file changed

+25
-0
lines changed

apps/webapp/app/v3/legacyRunEngineWorker.server.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import { env } from "~/env.server";
55
import { logger } from "~/services/logger.server";
66
import { singleton } from "~/utils/singleton";
77
import { TaskRunHeartbeatFailedService } from "./taskRunHeartbeatFailed.server";
8+
import { completeBatchTaskRunItemV3 } from "./services/batchTriggerV3.server";
9+
import { prisma } from "~/db.server";
810

911
function initializeWorker() {
1012
const redisOptions = {
@@ -34,6 +36,19 @@ function initializeWorker() {
3436
maxAttempts: 3,
3537
},
3638
},
39+
completeBatchTaskRunItem: {
40+
schema: z.object({
41+
itemId: z.string(),
42+
batchTaskRunId: z.string(),
43+
scheduleResumeOnComplete: z.boolean(),
44+
taskRunAttemptId: z.string().optional(),
45+
attempt: z.number().optional(),
46+
}),
47+
visibilityTimeoutMs: 60_000,
48+
retry: {
49+
maxAttempts: 10,
50+
},
51+
},
3752
},
3853
concurrency: {
3954
workers: env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_WORKERS,
@@ -49,6 +64,16 @@ function initializeWorker() {
4964

5065
await service.call(payload.runId);
5166
},
67+
completeBatchTaskRunItem: async ({ payload, attempt }) => {
68+
await completeBatchTaskRunItemV3(
69+
payload.itemId,
70+
payload.batchTaskRunId,
71+
prisma,
72+
payload.scheduleResumeOnComplete,
73+
payload.taskRunAttemptId,
74+
attempt
75+
);
76+
},
5277
},
5378
});
5479

0 commit comments

Comments
 (0)