Skip to content

Commit 9cb8d13

Browse files
committed
remove tx
1 parent b8a792a commit 9cb8d13

File tree

1 file changed

+160
-125
lines changed

1 file changed

+160
-125
lines changed

apps/webapp/app/v3/services/resumeAttempt.server.ts

Lines changed: 160 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,13 @@ import {
44
TaskRunExecutionResult,
55
} from "@trigger.dev/core/v3";
66
import type { InferSocketMessageSchema } from "@trigger.dev/core/v3/zodSocket";
7-
import { $transaction, PrismaClientOrTransaction } from "~/db.server";
87
import { logger } from "~/services/logger.server";
98
import { marqs } from "~/v3/marqs/index.server";
109
import { socketIo } from "../handleSocketIo.server";
1110
import { sharedQueueTasks } from "../marqs/sharedQueueConsumer.server";
1211
import { BaseService } from "./baseService.server";
1312
import { TaskRunAttempt } from "@trigger.dev/database";
14-
import { isFinalRunStatus } from "../taskStatus";
13+
import { FINAL_ATTEMPT_STATUSES, FINAL_RUN_STATUSES, isFinalRunStatus } from "../taskStatus";
1514

1615
export class ResumeAttemptService extends BaseService {
1716
private _logger = logger;
@@ -21,145 +20,139 @@ export class ResumeAttemptService extends BaseService {
2120
): Promise<void> {
2221
this._logger.debug(`ResumeAttemptService.call()`, params);
2322

24-
await $transaction(this._prisma, async (tx) => {
25-
const attempt = await tx.taskRunAttempt.findFirst({
26-
where: {
27-
friendlyId: params.attemptFriendlyId,
28-
},
29-
include: {
30-
taskRun: true,
31-
dependencies: {
32-
include: {
33-
taskRun: {
34-
include: {
35-
attempts: {
36-
orderBy: {
37-
number: "desc",
38-
},
39-
take: 1,
40-
select: {
41-
id: true,
42-
},
23+
const attempt = await this._prisma.taskRunAttempt.findFirst({
24+
where: {
25+
friendlyId: params.attemptFriendlyId,
26+
},
27+
include: {
28+
taskRun: true,
29+
dependencies: {
30+
include: {
31+
taskRun: {
32+
include: {
33+
attempts: {
34+
orderBy: {
35+
number: "desc",
36+
},
37+
take: 1,
38+
select: {
39+
id: true,
4340
},
4441
},
4542
},
4643
},
47-
orderBy: {
48-
createdAt: "desc",
49-
},
50-
take: 1,
5144
},
52-
batchDependencies: {
53-
include: {
54-
items: {
55-
include: {
56-
taskRun: {
57-
include: {
58-
attempts: {
59-
orderBy: {
60-
number: "desc",
61-
},
62-
take: 1,
63-
select: {
64-
id: true,
65-
},
45+
orderBy: {
46+
createdAt: "desc",
47+
},
48+
take: 1,
49+
},
50+
batchDependencies: {
51+
include: {
52+
items: {
53+
include: {
54+
taskRun: {
55+
include: {
56+
attempts: {
57+
orderBy: {
58+
number: "desc",
59+
},
60+
take: 1,
61+
select: {
62+
id: true,
6663
},
6764
},
6865
},
6966
},
7067
},
7168
},
72-
orderBy: {
73-
createdAt: "desc",
74-
},
75-
take: 1,
7669
},
70+
orderBy: {
71+
createdAt: "desc",
72+
},
73+
take: 1,
7774
},
78-
});
75+
},
76+
});
7977

80-
if (!attempt) {
81-
this._logger.error("Could not find attempt", params);
82-
return;
83-
}
78+
if (!attempt) {
79+
this._logger.error("Could not find attempt", params);
80+
return;
81+
}
8482

85-
this._logger = logger.child({
86-
attemptId: attempt.id,
87-
attemptFriendlyId: attempt.friendlyId,
88-
taskRun: attempt.taskRun,
89-
});
83+
this._logger = logger.child({
84+
attemptId: attempt.id,
85+
attemptFriendlyId: attempt.friendlyId,
86+
taskRun: attempt.taskRun,
87+
});
9088

91-
if (isFinalRunStatus(attempt.taskRun.status)) {
92-
this._logger.error("Run is not resumable");
93-
return;
94-
}
89+
if (isFinalRunStatus(attempt.taskRun.status)) {
90+
this._logger.error("Run is not resumable");
91+
return;
92+
}
9593

96-
let completedAttemptIds: string[] = [];
94+
let completedAttemptIds: string[] = [];
9795

98-
switch (params.type) {
99-
case "WAIT_FOR_DURATION": {
100-
this._logger.debug("Sending duration wait resume message");
96+
switch (params.type) {
97+
case "WAIT_FOR_DURATION": {
98+
this._logger.debug("Sending duration wait resume message");
10199

102-
await this.#setPostResumeStatuses(attempt, tx);
100+
await this.#setPostResumeStatuses(attempt);
103101

104-
socketIo.coordinatorNamespace.emit("RESUME_AFTER_DURATION", {
105-
version: "v1",
106-
attemptId: attempt.id,
107-
attemptFriendlyId: attempt.friendlyId,
108-
});
109-
break;
110-
}
111-
case "WAIT_FOR_TASK": {
112-
if (attempt.dependencies.length) {
113-
// We only care about the latest dependency
114-
const dependentAttempt = attempt.dependencies[0].taskRun.attempts[0];
115-
116-
if (!dependentAttempt) {
117-
this._logger.error("No dependent attempt");
118-
return;
119-
}
120-
121-
completedAttemptIds = [dependentAttempt.id];
122-
} else {
123-
this._logger.error("No task dependency");
102+
socketIo.coordinatorNamespace.emit("RESUME_AFTER_DURATION", {
103+
version: "v1",
104+
attemptId: attempt.id,
105+
attemptFriendlyId: attempt.friendlyId,
106+
});
107+
break;
108+
}
109+
case "WAIT_FOR_TASK": {
110+
if (attempt.dependencies.length) {
111+
// We only care about the latest dependency
112+
const dependentAttempt = attempt.dependencies[0].taskRun.attempts[0];
113+
114+
if (!dependentAttempt) {
115+
this._logger.error("No dependent attempt");
124116
return;
125117
}
126118

127-
await this.#handleDependencyResume(attempt, completedAttemptIds, tx);
128-
129-
break;
119+
completedAttemptIds = [dependentAttempt.id];
120+
} else {
121+
this._logger.error("No task dependency");
122+
return;
130123
}
131-
case "WAIT_FOR_BATCH": {
132-
if (attempt.batchDependencies) {
133-
// We only care about the latest batch dependency
134-
const dependentBatchItems = attempt.batchDependencies[0].items;
135-
136-
if (!dependentBatchItems) {
137-
this._logger.error("No dependent batch items");
138-
return;
139-
}
140-
141-
completedAttemptIds = dependentBatchItems.map((item) => item.taskRun.attempts[0]?.id);
142-
} else {
143-
this._logger.error("No batch dependency");
124+
125+
await this.#handleDependencyResume(attempt, completedAttemptIds);
126+
127+
break;
128+
}
129+
case "WAIT_FOR_BATCH": {
130+
if (attempt.batchDependencies) {
131+
// We only care about the latest batch dependency
132+
const dependentBatchItems = attempt.batchDependencies[0].items;
133+
134+
if (!dependentBatchItems) {
135+
this._logger.error("No dependent batch items");
144136
return;
145137
}
146138

147-
await this.#handleDependencyResume(attempt, completedAttemptIds, tx);
148-
149-
break;
150-
}
151-
default: {
152-
break;
139+
completedAttemptIds = dependentBatchItems.map((item) => item.taskRun.attempts[0]?.id);
140+
} else {
141+
this._logger.error("No batch dependency");
142+
return;
153143
}
144+
145+
await this.#handleDependencyResume(attempt, completedAttemptIds);
146+
147+
break;
154148
}
155-
});
149+
default: {
150+
break;
151+
}
152+
}
156153
}
157154

158-
async #handleDependencyResume(
159-
attempt: TaskRunAttempt,
160-
completedAttemptIds: string[],
161-
tx: PrismaClientOrTransaction
162-
) {
155+
async #handleDependencyResume(attempt: TaskRunAttempt, completedAttemptIds: string[]) {
163156
if (completedAttemptIds.length === 0) {
164157
this._logger.error("No completed attempt IDs");
165158
return;
@@ -169,7 +162,7 @@ export class ResumeAttemptService extends BaseService {
169162
const executions: TaskRunExecution[] = [];
170163

171164
for (const completedAttemptId of completedAttemptIds) {
172-
const completedAttempt = await tx.taskRunAttempt.findFirst({
165+
const completedAttempt = await this._prisma.taskRunAttempt.findFirst({
173166
where: {
174167
id: completedAttemptId,
175168
taskRun: {
@@ -221,7 +214,7 @@ export class ResumeAttemptService extends BaseService {
221214
executions.push(executionPayload.execution);
222215
}
223216

224-
await this.#setPostResumeStatuses(attempt, tx);
217+
await this.#setPostResumeStatuses(attempt);
225218

226219
socketIo.coordinatorNamespace.emit("RESUME_AFTER_DEPENDENCY", {
227220
version: "v1",
@@ -233,21 +226,63 @@ export class ResumeAttemptService extends BaseService {
233226
});
234227
}
235228

236-
async #setPostResumeStatuses(attempt: TaskRunAttempt, tx: PrismaClientOrTransaction) {
237-
return await tx.taskRunAttempt.update({
238-
where: {
239-
id: attempt.id,
240-
},
241-
data: {
242-
status: "EXECUTING",
243-
taskRun: {
244-
update: {
245-
data: {
246-
status: attempt.number > 1 ? "RETRYING_AFTER_FAILURE" : "EXECUTING",
229+
async #setPostResumeStatuses(attempt: TaskRunAttempt) {
230+
try {
231+
const updatedAttempt = await this._prisma.taskRunAttempt.update({
232+
where: {
233+
id: attempt.id,
234+
status: {
235+
notIn: FINAL_ATTEMPT_STATUSES,
236+
},
237+
taskRun: {
238+
status: {
239+
notIn: FINAL_RUN_STATUSES,
247240
},
248241
},
249242
},
250-
},
251-
});
243+
data: {
244+
status: "EXECUTING",
245+
taskRun: {
246+
update: {
247+
data: {
248+
status: attempt.number > 1 ? "RETRYING_AFTER_FAILURE" : "EXECUTING",
249+
},
250+
},
251+
},
252+
},
253+
select: {
254+
id: true,
255+
status: true,
256+
taskRun: {
257+
select: {
258+
id: true,
259+
status: true,
260+
},
261+
},
262+
},
263+
});
264+
265+
this._logger.debug("Set post resume statuses", {
266+
run: {
267+
id: updatedAttempt.taskRun.id,
268+
status: updatedAttempt.taskRun.status,
269+
},
270+
attempt: {
271+
id: updatedAttempt.id,
272+
status: updatedAttempt.status,
273+
},
274+
});
275+
} catch (error) {
276+
this._logger.error("Failed to set post resume statuses", {
277+
error:
278+
error instanceof Error
279+
? {
280+
name: error.name,
281+
message: error.message,
282+
stack: error.stack,
283+
}
284+
: error,
285+
});
286+
}
252287
}
253288
}

0 commit comments

Comments
 (0)