Skip to content

Commit 6766720

Browse files
committed
Created new test file for attempt failures
1 parent a06b7d1 commit 6766720

File tree

2 files changed

+169
-157
lines changed

2 files changed

+169
-157
lines changed
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
import {
2+
assertNonNullable,
3+
containerTest,
4+
setupAuthenticatedEnvironment,
5+
setupBackgroundWorker,
6+
} from "@internal/testcontainers";
7+
import { trace } from "@opentelemetry/api";
8+
import { expect } from "vitest";
9+
import { EventBusEventArgs } from "../eventBus.js";
10+
import { RunEngine } from "../index.js";
11+
12+
describe("RunEngine attempt failures", () => {
13+
containerTest(
14+
"Single run (retry attempt, then succeed)",
15+
{ timeout: 15_000 },
16+
async ({ prisma, redisOptions }) => {
17+
//create environment
18+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
19+
20+
const engine = new RunEngine({
21+
prisma,
22+
worker: {
23+
redis: redisOptions,
24+
workers: 1,
25+
tasksPerWorker: 10,
26+
pollIntervalMs: 100,
27+
},
28+
queue: {
29+
redis: redisOptions,
30+
},
31+
runLock: {
32+
redis: redisOptions,
33+
},
34+
machines: {
35+
defaultMachine: "small-1x",
36+
machines: {
37+
"small-1x": {
38+
name: "small-1x" as const,
39+
cpu: 0.5,
40+
memory: 0.5,
41+
centsPerMs: 0.0001,
42+
},
43+
},
44+
baseCostInCents: 0.0001,
45+
},
46+
tracer: trace.getTracer("test", "0.0.0"),
47+
});
48+
49+
try {
50+
const taskIdentifier = "test-task";
51+
52+
//create background worker
53+
const backgroundWorker = await setupBackgroundWorker(
54+
prisma,
55+
authenticatedEnvironment,
56+
taskIdentifier
57+
);
58+
59+
//trigger the run
60+
const run = await engine.trigger(
61+
{
62+
number: 1,
63+
friendlyId: "run_1234",
64+
environment: authenticatedEnvironment,
65+
taskIdentifier,
66+
payload: "{}",
67+
payloadType: "application/json",
68+
context: {},
69+
traceContext: {},
70+
traceId: "t12345",
71+
spanId: "s12345",
72+
masterQueue: "main",
73+
queueName: "task/test-task",
74+
isTest: false,
75+
tags: [],
76+
},
77+
prisma
78+
);
79+
80+
//dequeue the run
81+
const dequeued = await engine.dequeueFromMasterQueue({
82+
consumerId: "test_12345",
83+
masterQueue: run.masterQueue,
84+
maxRunCount: 10,
85+
});
86+
87+
//create an attempt
88+
const attemptResult = await engine.startRunAttempt({
89+
runId: dequeued[0].run.id,
90+
snapshotId: dequeued[0].snapshot.id,
91+
});
92+
93+
//fail the attempt
94+
const error = {
95+
type: "BUILT_IN_ERROR" as const,
96+
name: "UserError",
97+
message: "This is a user error",
98+
stackTrace: "Error: This is a user error\n at <anonymous>:1:1",
99+
};
100+
const result = await engine.completeRunAttempt({
101+
runId: dequeued[0].run.id,
102+
snapshotId: attemptResult.snapshot.id,
103+
completion: {
104+
ok: false,
105+
id: dequeued[0].run.id,
106+
error,
107+
retry: {
108+
timestamp: Date.now(),
109+
delay: 0,
110+
},
111+
},
112+
});
113+
expect(result.attemptStatus).toBe("RETRY_IMMEDIATELY");
114+
expect(result.snapshot.executionStatus).toBe("PENDING_EXECUTING");
115+
expect(result.run.status).toBe("RETRYING_AFTER_FAILURE");
116+
117+
//state should be completed
118+
const executionData3 = await engine.getRunExecutionData({ runId: run.id });
119+
assertNonNullable(executionData3);
120+
expect(executionData3.snapshot.executionStatus).toBe("PENDING_EXECUTING");
121+
//only when the new attempt is created, should the attempt be increased
122+
expect(executionData3.run.attemptNumber).toBe(1);
123+
expect(executionData3.run.status).toBe("RETRYING_AFTER_FAILURE");
124+
125+
//create a second attempt
126+
const attemptResult2 = await engine.startRunAttempt({
127+
runId: dequeued[0].run.id,
128+
snapshotId: executionData3.snapshot.id,
129+
});
130+
expect(attemptResult2.run.attemptNumber).toBe(2);
131+
132+
//now complete it successfully
133+
const result2 = await engine.completeRunAttempt({
134+
runId: dequeued[0].run.id,
135+
snapshotId: attemptResult2.snapshot.id,
136+
completion: {
137+
ok: true,
138+
id: dequeued[0].run.id,
139+
output: `{"foo":"bar"}`,
140+
outputType: "application/json",
141+
},
142+
});
143+
expect(result2.snapshot.executionStatus).toBe("FINISHED");
144+
expect(result2.run.attemptNumber).toBe(2);
145+
expect(result2.run.status).toBe("COMPLETED_SUCCESSFULLY");
146+
147+
//waitpoint should have been completed, with the output
148+
const runWaitpointAfter = await prisma.waitpoint.findMany({
149+
where: {
150+
completedByTaskRunId: run.id,
151+
},
152+
});
153+
expect(runWaitpointAfter.length).toBe(1);
154+
expect(runWaitpointAfter[0].type).toBe("RUN");
155+
expect(runWaitpointAfter[0].output).toBe(`{"foo":"bar"}`);
156+
expect(runWaitpointAfter[0].outputIsError).toBe(false);
157+
158+
//state should be completed
159+
const executionData4 = await engine.getRunExecutionData({ runId: run.id });
160+
assertNonNullable(executionData4);
161+
expect(executionData4.snapshot.executionStatus).toBe("FINISHED");
162+
expect(executionData4.run.attemptNumber).toBe(2);
163+
expect(executionData4.run.status).toBe("COMPLETED_SUCCESSFULLY");
164+
} finally {
165+
engine.quit();
166+
}
167+
}
168+
);
169+
});

internal-packages/run-engine/src/engine/tests/trigger.test.ts

Lines changed: 0 additions & 157 deletions
Original file line numberDiff line numberDiff line change
@@ -330,161 +330,4 @@ describe("RunEngine trigger()", () => {
330330
engine.quit();
331331
}
332332
});
333-
334-
containerTest(
335-
"Single run (retry attempt, then succeed)",
336-
{ timeout: 15_000 },
337-
async ({ prisma, redisOptions }) => {
338-
//create environment
339-
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
340-
341-
const engine = new RunEngine({
342-
prisma,
343-
worker: {
344-
redis: redisOptions,
345-
workers: 1,
346-
tasksPerWorker: 10,
347-
pollIntervalMs: 100,
348-
},
349-
queue: {
350-
redis: redisOptions,
351-
},
352-
runLock: {
353-
redis: redisOptions,
354-
},
355-
machines: {
356-
defaultMachine: "small-1x",
357-
machines: {
358-
"small-1x": {
359-
name: "small-1x" as const,
360-
cpu: 0.5,
361-
memory: 0.5,
362-
centsPerMs: 0.0001,
363-
},
364-
},
365-
baseCostInCents: 0.0001,
366-
},
367-
tracer: trace.getTracer("test", "0.0.0"),
368-
});
369-
370-
try {
371-
const taskIdentifier = "test-task";
372-
373-
//create background worker
374-
const backgroundWorker = await setupBackgroundWorker(
375-
prisma,
376-
authenticatedEnvironment,
377-
taskIdentifier
378-
);
379-
380-
//trigger the run
381-
const run = await engine.trigger(
382-
{
383-
number: 1,
384-
friendlyId: "run_1234",
385-
environment: authenticatedEnvironment,
386-
taskIdentifier,
387-
payload: "{}",
388-
payloadType: "application/json",
389-
context: {},
390-
traceContext: {},
391-
traceId: "t12345",
392-
spanId: "s12345",
393-
masterQueue: "main",
394-
queueName: "task/test-task",
395-
isTest: false,
396-
tags: [],
397-
},
398-
prisma
399-
);
400-
401-
//dequeue the run
402-
const dequeued = await engine.dequeueFromMasterQueue({
403-
consumerId: "test_12345",
404-
masterQueue: run.masterQueue,
405-
maxRunCount: 10,
406-
});
407-
408-
//create an attempt
409-
const attemptResult = await engine.startRunAttempt({
410-
runId: dequeued[0].run.id,
411-
snapshotId: dequeued[0].snapshot.id,
412-
});
413-
414-
//fail the attempt
415-
const error = {
416-
type: "BUILT_IN_ERROR" as const,
417-
name: "UserError",
418-
message: "This is a user error",
419-
stackTrace: "Error: This is a user error\n at <anonymous>:1:1",
420-
};
421-
const result = await engine.completeRunAttempt({
422-
runId: dequeued[0].run.id,
423-
snapshotId: attemptResult.snapshot.id,
424-
completion: {
425-
ok: false,
426-
id: dequeued[0].run.id,
427-
error,
428-
retry: {
429-
timestamp: Date.now(),
430-
delay: 0,
431-
},
432-
},
433-
});
434-
expect(result.attemptStatus).toBe("RETRY_IMMEDIATELY");
435-
expect(result.snapshot.executionStatus).toBe("PENDING_EXECUTING");
436-
expect(result.run.status).toBe("RETRYING_AFTER_FAILURE");
437-
438-
//state should be completed
439-
const executionData3 = await engine.getRunExecutionData({ runId: run.id });
440-
assertNonNullable(executionData3);
441-
expect(executionData3.snapshot.executionStatus).toBe("PENDING_EXECUTING");
442-
//only when the new attempt is created, should the attempt be increased
443-
expect(executionData3.run.attemptNumber).toBe(1);
444-
expect(executionData3.run.status).toBe("RETRYING_AFTER_FAILURE");
445-
446-
//create a second attempt
447-
const attemptResult2 = await engine.startRunAttempt({
448-
runId: dequeued[0].run.id,
449-
snapshotId: executionData3.snapshot.id,
450-
});
451-
expect(attemptResult2.run.attemptNumber).toBe(2);
452-
453-
//now complete it successfully
454-
const result2 = await engine.completeRunAttempt({
455-
runId: dequeued[0].run.id,
456-
snapshotId: attemptResult2.snapshot.id,
457-
completion: {
458-
ok: true,
459-
id: dequeued[0].run.id,
460-
output: `{"foo":"bar"}`,
461-
outputType: "application/json",
462-
},
463-
});
464-
expect(result2.snapshot.executionStatus).toBe("FINISHED");
465-
expect(result2.run.attemptNumber).toBe(2);
466-
expect(result2.run.status).toBe("COMPLETED_SUCCESSFULLY");
467-
468-
//waitpoint should have been completed, with the output
469-
const runWaitpointAfter = await prisma.waitpoint.findMany({
470-
where: {
471-
completedByTaskRunId: run.id,
472-
},
473-
});
474-
expect(runWaitpointAfter.length).toBe(1);
475-
expect(runWaitpointAfter[0].type).toBe("RUN");
476-
expect(runWaitpointAfter[0].output).toBe(`{"foo":"bar"}`);
477-
expect(runWaitpointAfter[0].outputIsError).toBe(false);
478-
479-
//state should be completed
480-
const executionData4 = await engine.getRunExecutionData({ runId: run.id });
481-
assertNonNullable(executionData4);
482-
expect(executionData4.snapshot.executionStatus).toBe("FINISHED");
483-
expect(executionData4.run.attemptNumber).toBe(2);
484-
expect(executionData4.run.status).toBe("COMPLETED_SUCCESSFULLY");
485-
} finally {
486-
engine.quit();
487-
}
488-
}
489-
);
490333
});

0 commit comments

Comments
 (0)