Skip to content

Commit e60ea2b

Browse files
committed
refactor: wip
1 parent 29016b0 commit e60ea2b

File tree

4 files changed

+154
-52
lines changed

4 files changed

+154
-52
lines changed

packages/utils/src/lib/profiler/profiler.ts

Lines changed: 74 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import path from 'node:path';
2+
import { performance } from 'node:perf_hooks';
13
import type { PerformanceEntry } from 'node:perf_hooks';
24
import process from 'node:process';
35
import { threadId } from 'node:worker_threads';
@@ -26,7 +28,14 @@ import {
2628
import { entryToTraceEvents } from './trace-file-utils.js';
2729
import type { UserTimingTraceEvent } from './trace-file.type.js';
2830
import { traceEventWalFormat } from './wal-json-trace.js';
29-
import { ShardedWal, WriteAheadLogFile } from './wal.js';
31+
import {
32+
ShardedWal,
33+
WriteAheadLogFile,
34+
getShardId,
35+
getShardedGroupId,
36+
isLeaderWal,
37+
setLeaderWal,
38+
} from './wal.js';
3039
import type { WalFormat } from './wal.js';
3140

3241
/**
@@ -80,11 +89,6 @@ export class Profiler<T extends ActionTrackConfigs> {
8089
*
8190
*/
8291
constructor(options: ProfilerOptions<T>) {
83-
// Initialize origin PID early - must happen before user code runs
84-
if (!process.env[PROFILER_ORIGIN_PID_ENV_VAR]) {
85-
process.env[PROFILER_ORIGIN_PID_ENV_VAR] = String(process.pid);
86-
}
87-
8892
const { tracks, prefix, enabled, ...defaults } = options;
8993
const dataType = 'track-entry';
9094

@@ -113,6 +117,13 @@ export class Profiler<T extends ActionTrackConfigs> {
113117
this.#enabled = enabled;
114118
}
115119

120+
/**
121+
* Close the profiler. Subclasses should override this to perform cleanup.
122+
*/
123+
close(): void {
124+
// Base implementation does nothing
125+
}
126+
116127
/**
117128
* Is profiling enabled?
118129
*
@@ -235,74 +246,103 @@ export class Profiler<T extends ActionTrackConfigs> {
235246
}
236247
}
237248

238-
/**
239-
* Determines if this process is the leader WAL process using the origin PID heuristic.
240-
*
241-
* The leader is the process that first enabled profiling (the one that set CP_PROFILER_ORIGIN_PID).
242-
* All descendant processes inherit the environment but have different PIDs.
243-
*
244-
* @returns true if this is the leader WAL process, false otherwise
245-
*/
246-
export function isLeaderWal(): boolean {
247-
return process.env[PROFILER_ORIGIN_PID_ENV_VAR] === String(process.pid);
248-
}
249-
250249
export class NodeProfiler<
251250
TracksConfig extends ActionTrackConfigs = ActionTrackConfigs,
252-
CodecOutput extends string | object = UserTimingTraceEvent,
251+
CodecOutput extends UserTimingTraceEvent = UserTimingTraceEvent,
253252
> extends Profiler<TracksConfig> {
254253
#shard: WriteAheadLogFile<CodecOutput>;
255254
#perfObserver: PerformanceObserverSink<CodecOutput>;
256255
#shardWal: ShardedWal<CodecOutput>;
257256
readonly #format: WalFormat<CodecOutput>;
257+
readonly #debug: boolean;
258+
#closed: boolean = false;
259+
258260
constructor(
259261
options: ProfilerOptions<TracksConfig> & {
260262
directory?: string;
261263
performanceEntryEncode: (entry: PerformanceEntry) => CodecOutput[];
262-
format: WalFormat<CodecOutput>;
264+
debug?: boolean;
263265
},
264266
) {
267+
// Initialize origin PID early - must happen before user code runs
268+
setLeaderWal(PROFILER_ORIGIN_PID_ENV_VAR);
269+
265270
const {
266271
directory = PROFILER_DIRECTORY,
267272
performanceEntryEncode,
268-
format,
273+
debug = false,
274+
...profilerOptions
269275
} = options;
270-
super(options);
271-
const shardId = `${process.pid}-${threadId}`;
276+
super(profilerOptions);
277+
const walGroupId = getShardedGroupId();
278+
const shardId = getShardId(process.pid, threadId);
272279

273-
this.#format = format;
274-
this.#shardWal = new ShardedWal(directory, format);
280+
this.#format = traceEventWalFormat({ groupId: walGroupId });
281+
this.#debug = debug;
282+
this.#shardWal = new ShardedWal(
283+
path.join(directory, walGroupId),
284+
this.#format,
285+
);
275286
this.#shard = this.#shardWal.shard(shardId);
276287

277288
this.#perfObserver = new PerformanceObserverSink({
278289
sink: this.#shard,
279290
encode: performanceEntryEncode,
280291
buffered: true,
281-
flushThreshold: 100,
292+
flushThreshold: 1, // Lower threshold for immediate flushing
282293
});
283294

295+
this.#perfObserver.subscribe();
296+
284297
installExitHandlers({
285298
onExit: () => {
286-
this.#perfObserver.flush();
287-
this.#perfObserver.unsubscribe();
288-
this.#shard.close();
289-
if (isLeaderWal()) {
290-
this.#shardWal.finalize();
291-
this.#shardWal.cleanup();
292-
}
299+
this.close();
293300
},
294301
});
295302
}
296303

297304
getFinalPath() {
298305
return this.#format.finalPath();
299306
}
307+
308+
/**
309+
* Close the profiler and finalize files if this is the leader process.
310+
* This method can be called manually to ensure proper cleanup.
311+
*/
312+
close(): void {
313+
if (this.#closed) {
314+
return;
315+
}
316+
317+
this.#closed = true;
318+
319+
try {
320+
if (!this.#perfObserver || !this.#shard || !this.#shardWal) {
321+
console.warn('Warning: Profiler not fully initialized during close');
322+
return;
323+
}
324+
325+
this.#perfObserver.flush();
326+
this.#perfObserver.unsubscribe();
327+
328+
this.#shard.close();
329+
330+
if (isLeaderWal(PROFILER_ORIGIN_PID_ENV_VAR)) {
331+
this.#shardWal.finalize();
332+
if (!this.#debug) {
333+
this.#shardWal.cleanup();
334+
}
335+
}
336+
} catch (error) {
337+
console.warn('Warning: Error during profiler close:', error);
338+
}
339+
}
300340
}
301341

302342
export const profiler = new NodeProfiler({
303343
prefix: 'cp',
304344
track: 'CLI',
305345
trackGroup: 'Code Pushup',
306346
performanceEntryEncode: entryToTraceEvents,
307-
format: traceEventWalFormat(),
347+
debug: process.env.CP_PROFILER_DEBUG === 'true',
308348
});

packages/utils/src/lib/profiler/trace-file-utils.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,9 @@ export const measureToSpanEvents = (
263263
* @param entry - Performance entry
264264
* @returns UserTimingTraceEvent[]
265265
*/
266-
export function entryToTraceEvents(entry: PerformanceEntry) {
266+
export function entryToTraceEvents(
267+
entry: PerformanceEntry,
268+
): UserTimingTraceEvent[] {
267269
if (entry.entryType === 'mark') {
268270
return [markToInstantEvent(entry as PerformanceMark)];
269271
}

packages/utils/src/lib/profiler/wal-json-trace.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,14 @@ export function generateTraceContent(
7171
*/
7272
export const traceEventWalFormat = <
7373
T extends UserTimingTraceEvent = UserTimingTraceEvent,
74-
>(_opt?: {
74+
>(opt?: {
7575
dir?: string;
76+
groupId?: string;
7677
}) => {
7778
const baseName = 'trace';
7879
const walExtension = '.jsonl';
7980
const finalExtension = '.json';
81+
const groupId = opt?.groupId || 'default';
8082
return {
8183
baseName,
8284
walExtension,
@@ -85,8 +87,8 @@ export const traceEventWalFormat = <
8587
encode: event => JSON.stringify(encodeTraceEvent(event)),
8688
decode: (json: string) => decodeTraceEvent(JSON.parse(json)) as T,
8789
},
88-
shardPath: (id: string) => `${baseName}.${id}${walExtension}`,
89-
finalPath: () => `${baseName}${finalExtension}`,
90+
shardPath: (id: string) => `${baseName}.${groupId}.${id}${walExtension}`,
91+
finalPath: () => `${baseName}.${groupId}${finalExtension}`,
9092
finalizer: (records, metadata) => generateTraceContent(records, metadata),
9193
} satisfies WalFormat<T>;
9294
};

packages/utils/src/lib/profiler/wal.ts

Lines changed: 72 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import * as fs from 'node:fs';
22
import path from 'node:path';
3+
import process from 'node:process';
4+
import { PROFILER_ORIGIN_PID_ENV_VAR } from './constants';
35

46
/**
57
* Codec for encoding/decoding values to/from strings for WAL storage.
@@ -153,13 +155,15 @@ export class WriteAheadLogFile<T> {
153155
/**
154156
* Append a record to the WAL.
155157
* @param v - Record to append
156-
* @throws Error if WAL is not opened
158+
* @throws Error if WAL cannot be opened
157159
*/
158160
append = (v: T) => {
159161
if (!this.#fd) {
160-
throw new Error('WAL not opened');
162+
this.open();
163+
}
164+
if (this.#fd) {
165+
fs.writeSync(this.#fd, `${this.#encode(v)}\n`);
161166
}
162-
fs.writeSync(this.#fd, `${this.#encode(v)}\n`);
163167
};
164168

165169
/** Close the WAL file */
@@ -278,6 +282,29 @@ export function parseWalFormat<T extends object | string = object>(
278282
} satisfies WalFormat<T>;
279283
}
280284

285+
/**
286+
* Determines if this process is the leader WAL process using the origin PID heuristic.
287+
*
288+
* The leader is the process that first enabled profiling (the one that set CP_PROFILER_ORIGIN_PID).
289+
* All descendant processes inherit the environment but have different PIDs.
290+
*
291+
* @returns true if this is the leader WAL process, false otherwise
292+
*/
293+
export function isLeaderWal(envVarName: string): boolean {
294+
return process.env[envVarName] === String(process.pid);
295+
}
296+
297+
/**
298+
* Initialize the origin PID environment variable if not already set.
299+
* This must be done as early as possible before any user code runs.
300+
* Set's PROFILER_ORIGIN_PID_ENV_VAR to the current process PID if not already defined.
301+
*/
302+
export function setLeaderWal(PROFILER_ORIGIN_PID_ENV_VAR: string): void {
303+
if (!process.env[PROFILER_ORIGIN_PID_ENV_VAR]) {
304+
process.env[PROFILER_ORIGIN_PID_ENV_VAR] = String(process.pid);
305+
}
306+
}
307+
281308
/**
282309
* Sharded Write-Ahead Log manager for coordinating multiple WAL shards.
283310
* Handles distributed logging across multiple processes/files with atomic finalization.
@@ -304,16 +331,21 @@ export class ShardedWal<T extends object | string = object> {
304331

305332
/** Get all shard file paths matching this WAL's base name */
306333
private shardFiles() {
307-
return fs.existsSync(this.#dir)
308-
? fs
309-
.readdirSync(this.#dir)
310-
.filter(
311-
f =>
312-
f.startsWith(`${this.#format.baseName}.`) &&
313-
f.endsWith(this.#format.walExtension),
314-
)
315-
.map(f => path.join(this.#dir, f))
316-
: [];
334+
if (!fs.existsSync(this.#dir)) {
335+
return [];
336+
}
337+
338+
const files: string[] = [];
339+
const entries = fs.readdirSync(this.#dir);
340+
341+
for (const entry of entries) {
342+
// Look for files matching the pattern: anything ending with .jsonl
343+
if (entry.endsWith(this.#format.walExtension)) {
344+
files.push(path.join(this.#dir, entry));
345+
}
346+
}
347+
348+
return files;
317349
}
318350

319351
/**
@@ -351,6 +383,32 @@ export class ShardedWal<T extends object | string = object> {
351383
}
352384

353385
cleanup() {
354-
this.shardFiles().forEach(f => fs.unlinkSync(f));
386+
this.shardFiles().forEach(f => {
387+
// Remove the shard file
388+
fs.unlinkSync(f);
389+
// Remove the parent directory (shard group directory)
390+
const shardDir = path.dirname(f);
391+
try {
392+
fs.rmdirSync(shardDir);
393+
} catch (error) {
394+
// Directory might not be empty or already removed, ignore
395+
}
396+
});
355397
}
356398
}
399+
400+
/**
401+
* Generates a shard ID.
402+
* This is idempotent since PID and TID are fixed for the process/thread.
403+
*/
404+
export function getShardId(pid: number, tid: number = 0): string {
405+
return `${pid}-${tid}`;
406+
}
407+
408+
/**
409+
* Generates a sharded group ID based on performance.timeOrigin.
410+
* This is idempotent per process since timeOrigin is fixed within a process and its worker.
411+
*/
412+
export function getShardedGroupId(): string {
413+
return Math.floor(performance.timeOrigin).toString();
414+
}

0 commit comments

Comments
 (0)