Skip to content

Commit b5ae558

Browse files
committed
add resolver id helper
1 parent 786416c commit b5ae558

File tree

1 file changed

+57
-36
lines changed

1 file changed

+57
-36
lines changed

packages/core/src/v3/runtime/sharedRuntimeManager.ts

Lines changed: 57 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { assertExhaustive } from "../../utils.js";
12
import { clock } from "../clock-api.js";
23
import { lifecycleHooks } from "../lifecycle-hooks-api.js";
34
import {
@@ -17,9 +18,7 @@ type Resolver = (value: CompletedWaitpoint) => void;
1718

1819
export class SharedRuntimeManager implements RuntimeManager {
1920
// Maps a resolver ID to a resolver function
20-
private readonly resolversByWaitId: Map<string, Resolver> = new Map();
21-
// Maps a waitpoint ID to a wait ID
22-
private readonly resolversByWaitpoint: Map<string, string> = new Map();
21+
private readonly resolversById: Map<string, Resolver> = new Map();
2322

2423
private _preventMultipleWaits = preventMultipleWaits();
2524

@@ -40,7 +39,7 @@ export class SharedRuntimeManager implements RuntimeManager {
4039
async waitForTask(params: { id: string; ctx: TaskRunContext }): Promise<TaskRunExecutionResult> {
4140
return this._preventMultipleWaits(async () => {
4241
const promise = new Promise<CompletedWaitpoint>((resolve) => {
43-
this.resolversByWaitId.set(params.id, resolve);
42+
this.resolversById.set(params.id, resolve);
4443
});
4544

4645
await lifecycleHooks.callOnWaitHookListeners({
@@ -70,22 +69,21 @@ export class SharedRuntimeManager implements RuntimeManager {
7069
return Promise.resolve({ id: params.id, items: [] });
7170
}
7271

73-
const promise = Promise.all(
74-
Array.from({ length: params.runCount }, (_, index) => {
75-
const resolverId = `${params.id}_${index}`;
76-
return new Promise<CompletedWaitpoint>((resolve, reject) => {
77-
this.resolversByWaitId.set(resolverId, resolve);
78-
});
79-
})
80-
);
72+
const promises = Array.from({ length: params.runCount }, (_, index) => {
73+
const resolverId = `${params.id}_${index}`;
74+
75+
return new Promise<CompletedWaitpoint>((resolve, reject) => {
76+
this.resolversById.set(resolverId, resolve);
77+
});
78+
});
8179

8280
await lifecycleHooks.callOnWaitHookListeners({
8381
type: "batch",
8482
batchId: params.id,
8583
runCount: params.runCount,
8684
});
8785

88-
const waitpoints = await promise;
86+
const waitpoints = await Promise.all(promises);
8987

9088
await lifecycleHooks.callOnResumeHookListeners({
9189
type: "batch",
@@ -109,7 +107,7 @@ export class SharedRuntimeManager implements RuntimeManager {
109107
}): Promise<WaitpointTokenResult> {
110108
return this._preventMultipleWaits(async () => {
111109
const promise = new Promise<CompletedWaitpoint>((resolve) => {
112-
this.resolversByWaitId.set(waitpointFriendlyId, resolve);
110+
this.resolversById.set(waitpointFriendlyId, resolve);
113111
});
114112

115113
if (finishDate) {
@@ -150,36 +148,60 @@ export class SharedRuntimeManager implements RuntimeManager {
150148
await Promise.all(waitpoints.map((waitpoint) => this.resolveWaitpoint(waitpoint)));
151149
}
152150

151+
private resolverIdFromWaitpoint(waitpoint: CompletedWaitpoint): string | null {
152+
switch (waitpoint.type) {
153+
case "RUN": {
154+
if (!waitpoint.completedByTaskRun) {
155+
this.log("No completedByTaskRun for RUN waitpoint", waitpoint);
156+
return null;
157+
}
158+
159+
if (waitpoint.completedByTaskRun.batch) {
160+
// This run is part of a batch
161+
return `${waitpoint.completedByTaskRun.batch.friendlyId}_${waitpoint.index}`;
162+
} else {
163+
// This run is NOT part of a batch
164+
return waitpoint.completedByTaskRun.friendlyId;
165+
}
166+
}
167+
case "BATCH": {
168+
if (!waitpoint.completedByBatch) {
169+
this.log("No completedByBatch for BATCH waitpoint", waitpoint);
170+
return null;
171+
}
172+
173+
return waitpoint.completedByBatch.friendlyId;
174+
}
175+
case "MANUAL":
176+
case "DATETIME": {
177+
return waitpoint.friendlyId;
178+
}
179+
default: {
180+
assertExhaustive(waitpoint.type);
181+
}
182+
}
183+
}
184+
153185
private resolveWaitpoint(waitpoint: CompletedWaitpoint): void {
154186
this.log("resolveWaitpoint", waitpoint);
155187

156-
let waitId: string | undefined;
157-
158-
if (waitpoint.completedByTaskRun) {
159-
if (waitpoint.completedByTaskRun.batch) {
160-
waitId = `${waitpoint.completedByTaskRun.batch.friendlyId}_${waitpoint.index}`;
161-
} else {
162-
waitId = waitpoint.completedByTaskRun.friendlyId;
163-
}
164-
} else if (waitpoint.completedByBatch) {
165-
//no waitpoint resolves associated with batch completions
166-
//a batch completion isn't when all the runs from a batch are completed
188+
if (waitpoint.type === "BATCH") {
189+
// We currently ignore these, they're not required to resume after a batch completes
190+
this.log("Ignoring BATCH waitpoint", waitpoint);
167191
return;
168-
} else if (waitpoint.type === "MANUAL" || waitpoint.type === "DATETIME") {
169-
waitId = waitpoint.friendlyId;
170-
} else {
171-
waitId = this.resolversByWaitpoint.get(waitpoint.id);
172192
}
173193

174-
if (!waitId) {
175-
this.log("No waitId found for waitpoint", { ...this.status, ...waitpoint });
194+
const resolverId = this.resolverIdFromWaitpoint(waitpoint);
195+
196+
if (!resolverId) {
197+
this.log("No resolverId found for waitpoint", { ...this.status, ...waitpoint });
176198
return;
177199
}
178200

179-
const resolve = this.resolversByWaitId.get(waitId);
201+
const resolve = this.resolversById.get(resolverId);
180202

181203
if (!resolve) {
182-
this.log("No resolver found for waitId", { ...this.status, waitId });
204+
this.log("No resolver found for resolverId", { ...this.status, resolverId });
183205
return;
184206
}
185207

@@ -190,7 +212,7 @@ export class SharedRuntimeManager implements RuntimeManager {
190212

191213
resolve(waitpoint);
192214

193-
this.resolversByWaitId.delete(waitId);
215+
this.resolversById.delete(resolverId);
194216
}
195217

196218
private waitpointToTaskRunExecutionResult(waitpoint: CompletedWaitpoint): TaskRunExecutionResult {
@@ -224,8 +246,7 @@ export class SharedRuntimeManager implements RuntimeManager {
224246

225247
private get status() {
226248
return {
227-
resolversbyWaitId: Array.from(this.resolversByWaitId.keys()),
228-
resolversByWaitpoint: Array.from(this.resolversByWaitpoint.keys()),
249+
resolversById: Array.from(this.resolversById.keys()),
229250
};
230251
}
231252
}

0 commit comments

Comments
 (0)