Skip to content

Commit 6d0927e

Browse files
committed
Support for the same run multiple times in the same batch
1 parent b73ca8b commit 6d0927e

File tree

3 files changed

+65
-34
lines changed

3 files changed

+65
-34
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
/*
2+
Warnings:
3+
4+
- A unique constraint covering the columns `[taskRunId,waitpointId,batchIndex]` on the table `TaskRunWaitpoint` will be added. If there are existing duplicate values, this will fail.
5+
6+
*/
7+
-- DropIndex
8+
DROP INDEX "TaskRunWaitpoint_taskRunId_waitpointId_key";
9+
10+
-- CreateIndex (multiple can have null batchIndex, so we need the other one below)
11+
CREATE UNIQUE INDEX "TaskRunWaitpoint_taskRunId_waitpointId_batchIndex_key" ON "TaskRunWaitpoint" ("taskRunId", "waitpointId", "batchIndex");
12+
13+
-- CreateIndex (where batchIndex is null)
14+
CREATE UNIQUE INDEX "TaskRunWaitpoint_taskRunId_waitpointId_batchIndex_null_key" ON "TaskRunWaitpoint"("taskRunId", "waitpointId") WHERE "batchIndex" IS NULL;

internal-packages/database/prisma/schema.prisma

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2120,7 +2120,9 @@ model TaskRunWaitpoint {
21202120
createdAt DateTime @default(now())
21212121
updatedAt DateTime @updatedAt
21222122
2123-
@@unique([taskRunId, waitpointId])
2123+
/// There are two constraints, the one below and also one that Prisma doesn't support
2124+
/// The second one implemented in SQL only prevents a TaskRun + Waitpoint with a null batchIndex
2125+
@@unique([taskRunId, waitpointId, batchIndex])
21242126
@@index([taskRunId])
21252127
@@index([waitpointId])
21262128
}

internal-packages/run-engine/src/engine/executionSnapshots.ts

Lines changed: 48 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -35,40 +35,55 @@ export async function getLatestExecutionSnapshot(
3535
...snapshot,
3636
friendlyId: SnapshotId.toFriendlyId(snapshot.id),
3737
runFriendlyId: RunId.toFriendlyId(snapshot.runId),
38-
completedWaitpoints: snapshot.completedWaitpoints.map((w) => {
39-
const index = snapshot.completedWaitpointOrder.findIndex((s) => s === w.id);
38+
completedWaitpoints: snapshot.completedWaitpoints.flatMap((w) => {
39+
//get all indexes of the waitpoint in the completedWaitpointOrder
40+
//we do this because the same run can be in a batch multiple times (i.e. same idempotencyKey)
41+
let indexes: (number | undefined)[] = [];
42+
for (let i = 0; i < snapshot.completedWaitpointOrder.length; i++) {
43+
if (snapshot.completedWaitpointOrder[i] === w.id) {
44+
indexes.push(i);
45+
}
46+
}
47+
48+
if (indexes.length === 0) {
49+
indexes.push(undefined);
50+
}
4051

41-
return {
42-
id: w.id,
43-
index: index === -1 ? undefined : index,
44-
friendlyId: w.friendlyId,
45-
type: w.type,
46-
completedAt: w.completedAt ?? new Date(),
47-
idempotencyKey:
48-
w.userProvidedIdempotencyKey && !w.inactiveIdempotencyKey ? w.idempotencyKey : undefined,
49-
completedByTaskRun: w.completedByTaskRunId
50-
? {
51-
id: w.completedByTaskRunId,
52-
friendlyId: RunId.toFriendlyId(w.completedByTaskRunId),
53-
batch: snapshot.batchId
54-
? {
55-
id: snapshot.batchId,
56-
friendlyId: BatchId.toFriendlyId(snapshot.batchId),
57-
}
58-
: undefined,
59-
}
60-
: undefined,
61-
completedAfter: w.completedAfter ?? undefined,
62-
completedByBatch: w.completedByBatchId
63-
? {
64-
id: w.completedByBatchId,
65-
friendlyId: BatchId.toFriendlyId(w.completedByBatchId),
66-
}
67-
: undefined,
68-
output: w.output ?? undefined,
69-
outputType: w.outputType,
70-
outputIsError: w.outputIsError,
71-
} satisfies CompletedWaitpoint;
52+
return indexes.map((index) => {
53+
return {
54+
id: w.id,
55+
index: index === -1 ? undefined : index,
56+
friendlyId: w.friendlyId,
57+
type: w.type,
58+
completedAt: w.completedAt ?? new Date(),
59+
idempotencyKey:
60+
w.userProvidedIdempotencyKey && !w.inactiveIdempotencyKey
61+
? w.idempotencyKey
62+
: undefined,
63+
completedByTaskRun: w.completedByTaskRunId
64+
? {
65+
id: w.completedByTaskRunId,
66+
friendlyId: RunId.toFriendlyId(w.completedByTaskRunId),
67+
batch: snapshot.batchId
68+
? {
69+
id: snapshot.batchId,
70+
friendlyId: BatchId.toFriendlyId(snapshot.batchId),
71+
}
72+
: undefined,
73+
}
74+
: undefined,
75+
completedAfter: w.completedAfter ?? undefined,
76+
completedByBatch: w.completedByBatchId
77+
? {
78+
id: w.completedByBatchId,
79+
friendlyId: BatchId.toFriendlyId(w.completedByBatchId),
80+
}
81+
: undefined,
82+
output: w.output ?? undefined,
83+
outputType: w.outputType,
84+
outputIsError: w.outputIsError,
85+
} satisfies CompletedWaitpoint;
86+
});
7287
}),
7388
};
7489
}

0 commit comments

Comments
 (0)