Skip to content

Commit 4d46d20

Browse files
committed
store and correctly resolve waipoints that come in early
1 parent b5ae558 commit 4d46d20

File tree

1 file changed

+31
-7
lines changed

1 file changed

+31
-7
lines changed

packages/core/src/v3/runtime/sharedRuntimeManager.ts

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@ import { preventMultipleWaits } from "./preventMultipleWaits.js";
1717
type Resolver = (value: CompletedWaitpoint) => void;
1818

1919
export class SharedRuntimeManager implements RuntimeManager {
20-
// Maps a resolver ID to a resolver function
21-
private readonly resolversById: Map<string, Resolver> = new Map();
20+
/** Maps a resolver ID to a resolver function */
21+
private readonly resolversById = new Map<string, Resolver>();
22+
23+
/** Stores waitpoints that arrive before their resolvers have been created */
24+
private readonly waitpointsByResolverId = new Map<string, CompletedWaitpoint>();
2225

2326
private _preventMultipleWaits = preventMultipleWaits();
2427

@@ -42,6 +45,9 @@ export class SharedRuntimeManager implements RuntimeManager {
4245
this.resolversById.set(params.id, resolve);
4346
});
4447

48+
// Resolve any waitpoints we received before the resolver was created
49+
this.resolvePendingWaitpoints();
50+
4551
await lifecycleHooks.callOnWaitHookListeners({
4652
type: "task",
4753
runId: params.id,
@@ -77,6 +83,9 @@ export class SharedRuntimeManager implements RuntimeManager {
7783
});
7884
});
7985

86+
// Resolve any waitpoints we received before the resolvers were created
87+
this.resolvePendingWaitpoints();
88+
8089
await lifecycleHooks.callOnWaitHookListeners({
8190
type: "batch",
8291
batchId: params.id,
@@ -110,6 +119,9 @@ export class SharedRuntimeManager implements RuntimeManager {
110119
this.resolversById.set(waitpointFriendlyId, resolve);
111120
});
112121

122+
// Resolve any waitpoints we received before the resolver was created
123+
this.resolvePendingWaitpoints();
124+
113125
if (finishDate) {
114126
await lifecycleHooks.callOnWaitHookListeners({
115127
type: "duration",
@@ -182,7 +194,7 @@ export class SharedRuntimeManager implements RuntimeManager {
182194
}
183195
}
184196

185-
private resolveWaitpoint(waitpoint: CompletedWaitpoint): void {
197+
private resolveWaitpoint(waitpoint: CompletedWaitpoint, resolverId?: string | null): void {
186198
this.log("resolveWaitpoint", waitpoint);
187199

188200
if (waitpoint.type === "BATCH") {
@@ -191,28 +203,39 @@ export class SharedRuntimeManager implements RuntimeManager {
191203
return;
192204
}
193205

194-
const resolverId = this.resolverIdFromWaitpoint(waitpoint);
206+
resolverId = resolverId ?? this.resolverIdFromWaitpoint(waitpoint);
195207

196208
if (!resolverId) {
197-
this.log("No resolverId found for waitpoint", { ...this.status, ...waitpoint });
209+
this.log("No resolverId for waitpoint", { ...this.status, ...waitpoint });
210+
211+
// No need to store the waitpoint, we'll never be able to resolve it
198212
return;
199213
}
200214

201215
const resolve = this.resolversById.get(resolverId);
202216

203217
if (!resolve) {
204218
this.log("No resolver found for resolverId", { ...this.status, resolverId });
219+
220+
// Store the waitpoint for later if we can't find a resolver
221+
this.waitpointsByResolverId.set(resolverId, waitpoint);
222+
205223
return;
206224
}
207225

208-
this.log("Resolving waitpoint", waitpoint);
209-
210226
// Ensure current time is accurate before resolving the waitpoint
211227
clock.reset();
212228

213229
resolve(waitpoint);
214230

215231
this.resolversById.delete(resolverId);
232+
this.waitpointsByResolverId.delete(resolverId);
233+
}
234+
235+
private resolvePendingWaitpoints(): void {
236+
for (const [resolverId, waitpoint] of this.waitpointsByResolverId.entries()) {
237+
this.resolveWaitpoint(waitpoint, resolverId);
238+
}
216239
}
217240

218241
private waitpointToTaskRunExecutionResult(waitpoint: CompletedWaitpoint): TaskRunExecutionResult {
@@ -247,6 +270,7 @@ export class SharedRuntimeManager implements RuntimeManager {
247270
private get status() {
248271
return {
249272
resolversById: Array.from(this.resolversById.keys()),
273+
waitpointsByResolverId: Array.from(this.waitpointsByResolverId.keys()),
250274
};
251275
}
252276
}

0 commit comments

Comments
 (0)