Skip to content

Commit 95c8073

Browse files
committed
Use finishWaitpoint, removed extra worker job
1 parent 03c485b commit 95c8073

File tree

1 file changed

+16
-30
lines changed
  • internal-packages/run-engine/src/engine

1 file changed

+16
-30
lines changed

internal-packages/run-engine/src/engine/index.ts

Lines changed: 16 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -119,13 +119,6 @@ const workerCatalog = {
119119
}),
120120
visibilityTimeoutMs: 10_000,
121121
},
122-
timeoutWaitpoint: {
123-
schema: z.object({
124-
waitpointId: z.string(),
125-
timeout: z.coerce.date(),
126-
}),
127-
visibilityTimeoutMs: 10_000,
128-
},
129122
};
130123

131124
type EngineWorker = Worker<typeof workerCatalog>;
@@ -207,12 +200,6 @@ export class RunEngine {
207200
runId: payload.runId,
208201
});
209202
},
210-
timeoutWaitpoint: async ({ payload }) => {
211-
await this.#timeoutWaitpoint({
212-
waitpointId: payload.waitpointId,
213-
timeout: payload.timeout,
214-
});
215-
},
216203
},
217204
}).start();
218205

@@ -1815,9 +1802,15 @@ export class RunEngine {
18151802
//schedule the timeout
18161803
if (timeout) {
18171804
await this.worker.enqueue({
1818-
id: `timeoutWaitpoint.${waitpoint.id}`,
1819-
job: "timeoutWaitpoint",
1820-
payload: { waitpointId: waitpoint.id, timeout },
1805+
id: `finishWaitpoint.${waitpoint.id}`,
1806+
job: "finishWaitpoint",
1807+
payload: {
1808+
waitpointId: waitpoint.id,
1809+
error: JSON.stringify({
1810+
code: WAITPOINT_TIMEOUT_ERROR_CODE,
1811+
message: `Waitpoint timed out at ${timeout.toISOString()}`,
1812+
}),
1813+
},
18211814
availableAt: timeout,
18221815
});
18231816
}
@@ -2054,7 +2047,13 @@ export class RunEngine {
20542047
await this.worker.enqueue({
20552048
id: `finishWaitpoint.${waitpoint}`,
20562049
job: "finishWaitpoint",
2057-
payload: { waitpointId: waitpoint, error: "Waitpoint timed out" },
2050+
payload: {
2051+
waitpointId: waitpoint,
2052+
error: JSON.stringify({
2053+
code: WAITPOINT_TIMEOUT_ERROR_CODE,
2054+
message: `Waitpoint timed out at ${failAfter.toISOString()}`,
2055+
}),
2056+
},
20582057
availableAt: failAfter,
20592058
});
20602059
}
@@ -3554,19 +3553,6 @@ export class RunEngine {
35543553
};
35553554
}
35563555

3557-
async #timeoutWaitpoint({ waitpointId, timeout }: { waitpointId: string; timeout: Date }) {
3558-
await this.completeWaitpoint({
3559-
id: waitpointId,
3560-
output: {
3561-
value: JSON.stringify({
3562-
code: WAITPOINT_TIMEOUT_ERROR_CODE,
3563-
message: `Waitpoint timed out at ${timeout.toISOString()}`,
3564-
}),
3565-
isError: true,
3566-
},
3567-
});
3568-
}
3569-
35703556
async #clearBlockingWaitpoints({ runId, tx }: { runId: string; tx?: PrismaClientOrTransaction }) {
35713557
const prisma = tx ?? this.prisma;
35723558
const deleted = await prisma.taskRunWaitpoint.deleteMany({

0 commit comments

Comments
 (0)