Skip to content

Commit bb9ac50

Browse files
committed
runtime suspendable state and improved logs
1 parent 5f52292 commit bb9ac50

File tree

3 files changed

+66
-40
lines changed

3 files changed

+66
-40
lines changed

packages/cli-v3/src/executions/taskRunProcess.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ export type OnSendDebugLogMessage = InferSocketMessageSchema<
3838
"SEND_DEBUG_LOG"
3939
>;
4040

41+
export type OnSetSuspendableMessage = InferSocketMessageSchema<
42+
typeof ExecutorToWorkerMessageCatalog,
43+
"SET_SUSPENDABLE"
44+
>;
45+
4146
export type TaskRunProcessOptions = {
4247
workerManifest: WorkerManifest;
4348
serverWorker: ServerBackgroundWorker;
@@ -74,6 +79,7 @@ export class TaskRunProcess {
7479
new Evt();
7580
public onIsBeingKilled: Evt<TaskRunProcess> = new Evt();
7681
public onSendDebugLog: Evt<OnSendDebugLogMessage> = new Evt();
82+
public onSetSuspendable: Evt<OnSetSuspendableMessage> = new Evt();
7783

7884
private _isPreparedForNextRun: boolean = false;
7985
private _isPreparedForNextAttempt: boolean = false;
@@ -187,6 +193,9 @@ export class TaskRunProcess {
187193
SEND_DEBUG_LOG: async (message) => {
188194
this.onSendDebugLog.post(message);
189195
},
196+
SET_SUSPENDABLE: async (message) => {
197+
this.onSetSuspendable.post(message);
198+
},
190199
},
191200
});
192201

packages/core/src/v3/runtime/sharedRuntimeManager.ts

Lines changed: 51 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { assertExhaustive } from "../../utils.js";
22
import { clock } from "../clock-api.js";
33
import { lifecycleHooks } from "../lifecycle-hooks-api.js";
4-
import { DebugLogProperties } from "../runEngineWorker/index.js";
4+
import { DebugLogPropertiesInput } from "../runEngineWorker/index.js";
55
import {
66
BatchTaskRunExecutionResult,
77
CompletedWaitpoint,
@@ -11,6 +11,7 @@ import {
1111
TaskRunSuccessfulExecutionResult,
1212
WaitpointTokenResult,
1313
} from "../schemas/index.js";
14+
import { tryCatch } from "../tryCatch.js";
1415
import { ExecutorToWorkerProcessConnection } from "../zodIpc.js";
1516
import { RuntimeManager } from "./manager.js";
1617
import { preventMultipleWaits } from "./preventMultipleWaits.js";
@@ -58,9 +59,7 @@ export class SharedRuntimeManager implements RuntimeManager {
5859
runId: params.id,
5960
});
6061

61-
// FIXME: We need to send a "ready to checkpoint" signal here
62-
63-
const waitpoint = await promise;
62+
const waitpoint = await this.suspendable(promise);
6463
const result = this.waitpointToTaskRunExecutionResult(waitpoint);
6564

6665
await lifecycleHooks.callOnResumeHookListeners({
@@ -99,9 +98,7 @@ export class SharedRuntimeManager implements RuntimeManager {
9998
runCount: params.runCount,
10099
});
101100

102-
// FIXME: We need to send a "ready to checkpoint" signal here
103-
104-
const waitpoints = await Promise.all(promises);
101+
const waitpoints = await this.suspendable(Promise.all(promises));
105102

106103
await lifecycleHooks.callOnResumeHookListeners({
107104
type: "batch",
@@ -143,9 +140,7 @@ export class SharedRuntimeManager implements RuntimeManager {
143140
});
144141
}
145142

146-
// FIXME: We need to send a "ready to checkpoint" signal here
147-
148-
const waitpoint = await promise;
143+
const waitpoint = await this.suspendable(promise);
149144

150145
if (finishDate) {
151146
await lifecycleHooks.callOnResumeHookListeners({
@@ -177,10 +172,9 @@ export class SharedRuntimeManager implements RuntimeManager {
177172
switch (waitpoint.type) {
178173
case "RUN": {
179174
if (!waitpoint.completedByTaskRun) {
180-
this.debugLog(
181-
"No completedByTaskRun for RUN waitpoint",
182-
this.waitpointForDebugLog(waitpoint)
183-
);
175+
this.debugLog("no completedByTaskRun for RUN waitpoint", {
176+
waitpoint: this.waitpointForDebugLog(waitpoint),
177+
});
184178
return null;
185179
}
186180

@@ -196,10 +190,9 @@ export class SharedRuntimeManager implements RuntimeManager {
196190
}
197191
case "BATCH": {
198192
if (!waitpoint.completedByBatch) {
199-
this.debugLog(
200-
"No completedByBatch for BATCH waitpoint",
201-
this.waitpointForDebugLog(waitpoint)
202-
);
193+
this.debugLog("no completedByBatch for BATCH waitpoint", {
194+
waitpoint: this.waitpointForDebugLog(waitpoint),
195+
});
203196
return null;
204197
}
205198

@@ -225,14 +218,18 @@ export class SharedRuntimeManager implements RuntimeManager {
225218

226219
if (waitpoint.type === "BATCH") {
227220
// We currently ignore these, they're not required to resume after a batch completes
228-
this.debugLog("Ignoring BATCH waitpoint", this.waitpointForDebugLog(waitpoint));
221+
this.debugLog("ignoring BATCH waitpoint", {
222+
waitpoint: this.waitpointForDebugLog(waitpoint),
223+
});
229224
return;
230225
}
231226

232227
resolverId = resolverId ?? this.resolverIdFromWaitpoint(waitpoint);
233228

234229
if (!resolverId) {
235-
this.debugLog("No resolverId for waitpoint", this.waitpointForDebugLog(waitpoint));
230+
this.debugLog("no resolverId for waitpoint", {
231+
waitpoint: this.waitpointForDebugLog(waitpoint),
232+
});
236233

237234
// No need to store the waitpoint, we'll never be able to resolve it
238235
return;
@@ -241,7 +238,10 @@ export class SharedRuntimeManager implements RuntimeManager {
241238
const resolve = this.resolversById.get(resolverId);
242239

243240
if (!resolve) {
244-
this.debugLog("No resolver found for resolverId", { ...this.status, resolverId });
241+
this.debugLog("no resolver found for resolverId", {
242+
resolverId,
243+
waitpoint: this.waitpointForDebugLog(waitpoint),
244+
});
245245

246246
// Store the waitpoint for later if we can't find a resolver
247247
this.waitpointsByResolverId.set(resolverId, waitpoint);
@@ -264,6 +264,23 @@ export class SharedRuntimeManager implements RuntimeManager {
264264
}
265265
}
266266

267+
private setSuspendable(suspendable: boolean): void {
268+
this.ipc.send("SET_SUSPENDABLE", { suspendable });
269+
}
270+
271+
private async suspendable<T>(promise: Promise<T>): Promise<T> {
272+
this.setSuspendable(true);
273+
const [error, result] = await tryCatch(promise);
274+
this.setSuspendable(false);
275+
276+
if (error) {
277+
this.debugLog("error in suspendable wrapper", { error: String(error) });
278+
throw error;
279+
}
280+
281+
return result;
282+
}
283+
267284
private waitpointToTaskRunExecutionResult(waitpoint: CompletedWaitpoint): TaskRunExecutionResult {
268285
if (!waitpoint.completedByTaskRun?.friendlyId) throw new Error("Missing completedByTaskRun");
269286

@@ -288,39 +305,33 @@ export class SharedRuntimeManager implements RuntimeManager {
288305
}
289306
}
290307

291-
private waitpointForDebugLog(waitpoint: CompletedWaitpoint): DebugLogProperties {
292-
const {
293-
completedByTaskRun,
294-
completedByBatch,
295-
completedAfter,
296-
completedAt,
297-
output,
298-
id,
299-
...rest
300-
} = waitpoint;
308+
private waitpointForDebugLog(waitpoint: CompletedWaitpoint): DebugLogPropertiesInput {
309+
const { completedAfter, completedAt, output, ...rest } = waitpoint;
301310

302311
return {
303312
...rest,
304-
waitpointId: id,
305313
output: output?.slice(0, 100),
306-
completedByTaskRunId: completedByTaskRun?.id,
307-
completedByTaskRunBatchId: completedByTaskRun?.batch?.id,
308-
completedByBatchId: completedByBatch?.id,
309314
completedAfter: completedAfter?.toISOString(),
310315
completedAt: completedAt?.toISOString(),
316+
completedAfterDate: completedAfter,
317+
completedAtDate: completedAt,
311318
};
312319
}
313320

314-
private debugLog(message: string, properties?: DebugLogProperties) {
315-
const status = this.status;
316-
321+
private debugLog(message: string, properties?: DebugLogPropertiesInput) {
317322
if (this.showLogs) {
318-
console.log(`[${new Date().toISOString()}] ${message}`, { ...status, ...properties });
323+
console.log(`[${new Date().toISOString()}] ${message}`, {
324+
runtimeStatus: this.status,
325+
...properties,
326+
});
319327
}
320328

321329
this.ipc.send("SEND_DEBUG_LOG", {
322330
message,
323-
properties: { ...status, ...properties },
331+
properties: {
332+
runtimeStatus: this.status,
333+
...properties,
334+
},
324335
});
325336
}
326337

packages/core/src/v3/schemas/messages.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,12 @@ export const ExecutorToWorkerMessageCatalog = {
182182
properties: DebugLogPropertiesInput.optional(),
183183
}),
184184
},
185+
SET_SUSPENDABLE: {
186+
message: z.object({
187+
version: z.literal("v1").default("v1"),
188+
suspendable: z.boolean(),
189+
}),
190+
},
185191
};
186192

187193
export const WorkerToExecutorMessageCatalog = {

0 commit comments

Comments
 (0)