Skip to content

Commit fde9918

Browse files
committed
Added waitpoint idempotency tests
1 parent 95c8073 commit fde9918

File tree

1 file changed

+319
-0
lines changed

1 file changed

+319
-0
lines changed

internal-packages/run-engine/src/engine/tests/waitpoints.test.ts

Lines changed: 319 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -817,4 +817,323 @@ describe("RunEngine Waitpoints", () => {
817817
}
818818
}
819819
);
820+
821+
containerTest(
822+
"Manual waitpoint with idempotency",
823+
{ timeout: 15_000 },
824+
async ({ prisma, redisOptions }) => {
825+
//create environment
826+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
827+
828+
const engine = new RunEngine({
829+
prisma,
830+
worker: {
831+
redis: redisOptions,
832+
workers: 1,
833+
tasksPerWorker: 10,
834+
pollIntervalMs: 100,
835+
},
836+
queue: {
837+
redis: redisOptions,
838+
},
839+
runLock: {
840+
redis: redisOptions,
841+
},
842+
machines: {
843+
defaultMachine: "small-1x",
844+
machines: {
845+
"small-1x": {
846+
name: "small-1x" as const,
847+
cpu: 0.5,
848+
memory: 0.5,
849+
centsPerMs: 0.0001,
850+
},
851+
},
852+
baseCostInCents: 0.0001,
853+
},
854+
tracer: trace.getTracer("test", "0.0.0"),
855+
});
856+
857+
try {
858+
const taskIdentifier = "test-task";
859+
860+
//create background worker
861+
await setupBackgroundWorker(prisma, authenticatedEnvironment, taskIdentifier);
862+
863+
//trigger the run
864+
const run = await engine.trigger(
865+
{
866+
number: 1,
867+
friendlyId: "run_p1234",
868+
environment: authenticatedEnvironment,
869+
taskIdentifier,
870+
payload: "{}",
871+
payloadType: "application/json",
872+
context: {},
873+
traceContext: {},
874+
traceId: "t12345",
875+
spanId: "s12345",
876+
masterQueue: "main",
877+
queueName: "task/test-task",
878+
isTest: false,
879+
tags: [],
880+
},
881+
prisma
882+
);
883+
884+
//dequeue the run
885+
const dequeued = await engine.dequeueFromMasterQueue({
886+
consumerId: "test_12345",
887+
masterQueue: run.masterQueue,
888+
maxRunCount: 10,
889+
});
890+
891+
//create an attempt
892+
const attemptResult = await engine.startRunAttempt({
893+
runId: dequeued[0].run.id,
894+
snapshotId: dequeued[0].snapshot.id,
895+
});
896+
expect(attemptResult.snapshot.executionStatus).toBe("EXECUTING");
897+
898+
const idempotencyKey = "a-key";
899+
900+
//create a manual waitpoint with timeout
901+
const waitpoint = await engine.createManualWaitpoint({
902+
environmentId: authenticatedEnvironment.id,
903+
projectId: authenticatedEnvironment.projectId,
904+
idempotencyKey,
905+
});
906+
expect(waitpoint.status).toBe("PENDING");
907+
expect(waitpoint.idempotencyKey).toBe(idempotencyKey);
908+
expect(waitpoint.userProvidedIdempotencyKey).toBe(true);
909+
910+
//block the run
911+
await engine.blockRunWithWaitpoint({
912+
runId: run.id,
913+
waitpoints: waitpoint.id,
914+
environmentId: authenticatedEnvironment.id,
915+
projectId: authenticatedEnvironment.projectId,
916+
});
917+
918+
const executionData = await engine.getRunExecutionData({ runId: run.id });
919+
expect(executionData?.snapshot.executionStatus).toBe("EXECUTING_WITH_WAITPOINTS");
920+
921+
//check there is a waitpoint blocking the parent run
922+
const runWaitpointBefore = await prisma.taskRunWaitpoint.findFirst({
923+
where: {
924+
taskRunId: run.id,
925+
},
926+
include: {
927+
waitpoint: true,
928+
},
929+
});
930+
expect(runWaitpointBefore?.waitpointId).toBe(waitpoint.id);
931+
932+
let event: EventBusEventArgs<"workerNotification">[0] | undefined = undefined;
933+
engine.eventBus.on("workerNotification", (result) => {
934+
event = result;
935+
});
936+
937+
//complete the waitpoint
938+
await engine.completeWaitpoint({
939+
id: waitpoint.id,
940+
});
941+
942+
await setTimeout(200);
943+
944+
const executionData2 = await engine.getRunExecutionData({ runId: run.id });
945+
expect(executionData2?.snapshot.executionStatus).toBe("EXECUTING");
946+
947+
assertNonNullable(event);
948+
const notificationEvent = event as EventBusEventArgs<"workerNotification">[0];
949+
expect(notificationEvent.run.id).toBe(run.id);
950+
951+
//check there are no waitpoints blocking the parent run
952+
const runWaitpoint = await prisma.taskRunWaitpoint.findFirst({
953+
where: {
954+
taskRunId: run.id,
955+
},
956+
include: {
957+
waitpoint: true,
958+
},
959+
});
960+
expect(runWaitpoint).toBeNull();
961+
962+
const waitpoint2 = await prisma.waitpoint.findUnique({
963+
where: {
964+
id: waitpoint.id,
965+
},
966+
});
967+
assertNonNullable(waitpoint2);
968+
expect(waitpoint2.status).toBe("COMPLETED");
969+
expect(waitpoint2.outputIsError).toBe(false);
970+
} finally {
971+
engine.quit();
972+
}
973+
}
974+
);
975+
976+
containerTest(
977+
"Manual waitpoint with idempotency and ttl",
978+
{ timeout: 15_000 },
979+
async ({ prisma, redisOptions }) => {
980+
//create environment
981+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
982+
983+
const engine = new RunEngine({
984+
prisma,
985+
worker: {
986+
redis: redisOptions,
987+
workers: 1,
988+
tasksPerWorker: 10,
989+
pollIntervalMs: 100,
990+
},
991+
queue: {
992+
redis: redisOptions,
993+
},
994+
runLock: {
995+
redis: redisOptions,
996+
},
997+
machines: {
998+
defaultMachine: "small-1x",
999+
machines: {
1000+
"small-1x": {
1001+
name: "small-1x" as const,
1002+
cpu: 0.5,
1003+
memory: 0.5,
1004+
centsPerMs: 0.0001,
1005+
},
1006+
},
1007+
baseCostInCents: 0.0001,
1008+
},
1009+
tracer: trace.getTracer("test", "0.0.0"),
1010+
});
1011+
1012+
try {
1013+
const taskIdentifier = "test-task";
1014+
1015+
//create background worker
1016+
await setupBackgroundWorker(prisma, authenticatedEnvironment, taskIdentifier);
1017+
1018+
//trigger the run
1019+
const run = await engine.trigger(
1020+
{
1021+
number: 1,
1022+
friendlyId: "run_p1234",
1023+
environment: authenticatedEnvironment,
1024+
taskIdentifier,
1025+
payload: "{}",
1026+
payloadType: "application/json",
1027+
context: {},
1028+
traceContext: {},
1029+
traceId: "t12345",
1030+
spanId: "s12345",
1031+
masterQueue: "main",
1032+
queueName: "task/test-task",
1033+
isTest: false,
1034+
tags: [],
1035+
},
1036+
prisma
1037+
);
1038+
1039+
//dequeue the run
1040+
const dequeued = await engine.dequeueFromMasterQueue({
1041+
consumerId: "test_12345",
1042+
masterQueue: run.masterQueue,
1043+
maxRunCount: 10,
1044+
});
1045+
1046+
//create an attempt
1047+
const attemptResult = await engine.startRunAttempt({
1048+
runId: dequeued[0].run.id,
1049+
snapshotId: dequeued[0].snapshot.id,
1050+
});
1051+
expect(attemptResult.snapshot.executionStatus).toBe("EXECUTING");
1052+
1053+
const idempotencyKey = "a-key";
1054+
1055+
//create a manual waitpoint with timeout
1056+
const waitpoint = await engine.createManualWaitpoint({
1057+
environmentId: authenticatedEnvironment.id,
1058+
projectId: authenticatedEnvironment.projectId,
1059+
idempotencyKey,
1060+
idempotencyKeyExpiresAt: new Date(Date.now() + 200),
1061+
});
1062+
expect(waitpoint.status).toBe("PENDING");
1063+
expect(waitpoint.idempotencyKey).toBe(idempotencyKey);
1064+
expect(waitpoint.userProvidedIdempotencyKey).toBe(true);
1065+
1066+
const sameWaitpoint = await engine.createManualWaitpoint({
1067+
environmentId: authenticatedEnvironment.id,
1068+
projectId: authenticatedEnvironment.projectId,
1069+
idempotencyKey,
1070+
idempotencyKeyExpiresAt: new Date(Date.now() + 200),
1071+
});
1072+
expect(sameWaitpoint.id).toBe(waitpoint.id);
1073+
1074+
//block the run
1075+
await engine.blockRunWithWaitpoint({
1076+
runId: run.id,
1077+
waitpoints: waitpoint.id,
1078+
environmentId: authenticatedEnvironment.id,
1079+
projectId: authenticatedEnvironment.projectId,
1080+
});
1081+
1082+
const executionData = await engine.getRunExecutionData({ runId: run.id });
1083+
expect(executionData?.snapshot.executionStatus).toBe("EXECUTING_WITH_WAITPOINTS");
1084+
1085+
//check there is a waitpoint blocking the parent run
1086+
const runWaitpointBefore = await prisma.taskRunWaitpoint.findFirst({
1087+
where: {
1088+
taskRunId: run.id,
1089+
},
1090+
include: {
1091+
waitpoint: true,
1092+
},
1093+
});
1094+
expect(runWaitpointBefore?.waitpointId).toBe(waitpoint.id);
1095+
1096+
let event: EventBusEventArgs<"workerNotification">[0] | undefined = undefined;
1097+
engine.eventBus.on("workerNotification", (result) => {
1098+
event = result;
1099+
});
1100+
1101+
//complete the waitpoint
1102+
await engine.completeWaitpoint({
1103+
id: waitpoint.id,
1104+
});
1105+
1106+
await setTimeout(200);
1107+
1108+
const executionData2 = await engine.getRunExecutionData({ runId: run.id });
1109+
expect(executionData2?.snapshot.executionStatus).toBe("EXECUTING");
1110+
1111+
assertNonNullable(event);
1112+
const notificationEvent = event as EventBusEventArgs<"workerNotification">[0];
1113+
expect(notificationEvent.run.id).toBe(run.id);
1114+
1115+
//check there are no waitpoints blocking the parent run
1116+
const runWaitpoint = await prisma.taskRunWaitpoint.findFirst({
1117+
where: {
1118+
taskRunId: run.id,
1119+
},
1120+
include: {
1121+
waitpoint: true,
1122+
},
1123+
});
1124+
expect(runWaitpoint).toBeNull();
1125+
1126+
const waitpoint2 = await prisma.waitpoint.findUnique({
1127+
where: {
1128+
id: waitpoint.id,
1129+
},
1130+
});
1131+
assertNonNullable(waitpoint2);
1132+
expect(waitpoint2.status).toBe("COMPLETED");
1133+
expect(waitpoint2.outputIsError).toBe(false);
1134+
} finally {
1135+
engine.quit();
1136+
}
1137+
}
1138+
);
8201139
});

0 commit comments

Comments
 (0)