Skip to content

Commit 1dafc5e

Browse files
committed
refactor: wip
1 parent 56052eb commit 1dafc5e

File tree

4 files changed

+87
-34
lines changed

4 files changed

+87
-34
lines changed

packages/utils/src/lib/profiler/constants.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,5 @@ export const PROFILER_ORIGIN_PID_ENV_VAR = 'CP_PROFILER_ORIGIN_PID';
44
export const PROFILER_DIRECTORY_ENV_VAR = 'CP_PROFILER_DIR';
55
export const PROFILER_BASE_NAME = 'trace';
66
export const PROFILER_DIRECTORY = './tmp/profiles';
7+
export const SHARDED_WAL_COORDINATOR_ID_ENV_VAR =
8+
'CP_SHARDED_WAL_COORDINATOR_ID';

packages/utils/src/lib/profiler/profiler.ts

Lines changed: 23 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ import type {
1717
DevToolsColor,
1818
EntryMeta,
1919
} from '../user-timing-extensibility-api.type.js';
20+
import { ShardedWal, type WalFormat, WriteAheadLogFile } from '../wal';
2021
import { PROFILER_ENABLED_ENV_VAR } from './constants.js';
21-
import { type TraceEvent } from './trace-file.type.js';
2222

2323
/**
2424
* Generates a unique profiler ID based on performance time origin, process ID, thread ID, and instance count.
@@ -241,37 +241,28 @@ export class Profiler<T extends ActionTrackConfigs> {
241241
}
242242
}
243243

244-
// @TODO implement ShardedWAL
245-
type WalSink = {
246-
append: (event: TraceEvent) => void;
247-
open: () => void;
248-
close: () => void;
249-
isClosed: () => boolean;
244+
export type ProfilerPersistOptions<DomainObject extends string | object> = {
245+
format: WalFormat<DomainObject>;
250246
};
247+
export type NodeJsProfilerOptions<
248+
T extends ActionTrackConfigs,
249+
DomainObject extends string | object,
250+
> = ProfilerOptions<T> & ProfilerPersistOptions<DomainObject>;
251251

252-
export type NodeJsProfilerOptions<T extends ActionTrackConfigs> =
253-
ProfilerOptions<T> & {
254-
// @TODO implement WALFormat
255-
format: {
256-
encode: (v: string | object) => string;
257-
};
258-
};
259-
260-
export class NodeJsProfiler<T extends ActionTrackConfigs> extends Profiler<T> {
252+
export class NodeJsProfiler<
253+
T extends ActionTrackConfigs,
254+
DomainObject extends string | object,
255+
> extends Profiler<T> {
261256
#exitHandlerSubscription: null | (() => void) = null;
262-
protected sink: WalSink | null = null;
257+
#shardedWal: ShardedWal<DomainObject>;
258+
#sink: WriteAheadLogFile<DomainObject> | null = null;
263259

264-
constructor(options: NodeJsProfilerOptions<T>) {
265-
super(options);
266-
// Temporary dummy sink; replaced by real WAL implementation
267-
this.sink = {
268-
append: event => {
269-
options.format.encode(event);
270-
},
271-
open: () => void 0,
272-
close: () => void 0,
273-
isClosed: () => false,
274-
};
260+
constructor(options: NodeJsProfilerOptions<T, DomainObject>) {
261+
const { format, ...profilerOptons } = options;
262+
super(profilerOptons);
263+
264+
this.#shardedWal = new ShardedWal({ format });
265+
this.#sink = this.#shardedWal.shard();
275266
this.#exitHandlerSubscription = this.subscribeProcessExit();
276267
}
277268

@@ -317,12 +308,11 @@ export class NodeJsProfiler<T extends ActionTrackConfigs> extends Profiler<T> {
317308
* data is flushed and the WAL sink is properly closed.
318309
*/
319310
close(): void {
320-
if (!this.isEnabled()) {
321-
return;
322-
}
323-
this.setEnabled(false);
324311
this.#exitHandlerSubscription?.();
325312
this.#exitHandlerSubscription = null;
326-
this.sink?.close();
313+
this.setEnabled(false);
314+
this.#sink?.close();
315+
this.#sink = null;
316+
this.#shardedWal.finalizeIfCoordinator();
327317
}
328318
}

packages/utils/src/lib/profiler/profiler.unit.test.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,4 +574,18 @@ describe('NodeJsProfiler', () => {
574574
expect(setEnabledSpy).toHaveBeenCalledTimes(1);
575575
expect(setEnabledSpy).toHaveBeenCalledWith(false);
576576
});
577+
578+
it('close() unsubscribes from exit handlers even when disabled', () => {
579+
const unsubscribeFn = vi.fn();
580+
mockSubscribeProcessExit.mockReturnValue(unsubscribeFn);
581+
582+
profiler = createProfiler({ enabled: false });
583+
expect(profiler.isEnabled()).toBe(false);
584+
expect(mockSubscribeProcessExit).toHaveBeenCalled();
585+
586+
profiler.close();
587+
588+
expect(unsubscribeFn).toHaveBeenCalledTimes(1);
589+
expect(profiler.isEnabled()).toBe(false);
590+
});
577591
});

packages/utils/src/lib/wal.ts

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import * as fs from 'node:fs';
33
import path from 'node:path';
44
import process from 'node:process';
55
import { threadId } from 'node:worker_threads';
6+
import { SHARDED_WAL_COORDINATOR_ID_ENV_VAR } from './profiler/constants.js';
67

78
/**
89
* Codec for encoding/decoding values to/from strings for WAL storage.
@@ -356,6 +357,14 @@ export function setLeaderWal(envVarName: string, profilerID: string): void {
356357
// eslint-disable-next-line functional/no-let
357358
let shardCount = 0;
358359

360+
/**
361+
* Generates a unique sharded WAL ID based on performance time origin, process ID, thread ID, and instance count.
362+
*/
363+
function getShardedWalId() {
364+
// eslint-disable-next-line functional/immutable-data
365+
return `${Math.round(performance.timeOrigin)}.${process.pid}.${threadId}.${++ShardedWal.instanceCount}`;
366+
}
367+
359368
/**
360369
* Generates a human-readable shard ID.
361370
* This ID is unique per process/thread/shard combination and used in the file name.
@@ -462,24 +471,52 @@ export function getShardedFinalPath<T extends object | string = object>(opt: {
462471
*/
463472

464473
export class ShardedWal<T extends object | string = object> {
474+
static instanceCount = 0;
475+
readonly #id: string = getShardedWalId();
465476
readonly groupId = getShardedGroupId();
466477
readonly #format: WalFormat<T>;
467478
readonly #dir: string = process.cwd();
479+
readonly #isCoordinator: boolean;
480+
#finalized = false;
468481

469482
/**
470483
* Create a sharded WAL manager.
484+
*
485+
* @param opt.dir - Base directory to store shard files (defaults to process.cwd())
486+
* @param opt.format - WAL format configuration
487+
* @param opt.groupId - Group ID for sharding (defaults to generated group ID)
488+
* @param opt.coordinatorIdEnvVar - Environment variable name for storing coordinator ID (defaults to CP_SHARDED_WAL_COORDINATOR_ID)
471489
*/
472490
constructor(opt: {
473491
dir?: string;
474492
format: Partial<WalFormat<T>>;
475493
groupId?: string;
494+
coordinatorIdEnvVar?: string;
476495
}) {
477-
const { dir, format, groupId } = opt;
496+
const {
497+
dir,
498+
format,
499+
groupId,
500+
coordinatorIdEnvVar = SHARDED_WAL_COORDINATOR_ID_ENV_VAR,
501+
} = opt;
478502
this.groupId = groupId ?? getShardedGroupId();
479503
if (dir) {
480504
this.#dir = dir;
481505
}
482506
this.#format = parseWalFormat<T>(format);
507+
this.#isCoordinator = isLeaderWal(coordinatorIdEnvVar, this.#id);
508+
}
509+
510+
/**
511+
* Is this instance the coordinator?
512+
*
513+
* Coordinator status is determined from the coordinatorIdEnvVar environment variable.
514+
* The coordinator handles finalization and cleanup of shard files.
515+
*
516+
* @returns true if this instance is the coordinator, false otherwise
517+
*/
518+
isCoordinator(): boolean {
519+
return this.#isCoordinator;
483520
}
484521

485522
shard(shardId: string = getShardId()) {
@@ -570,4 +607,14 @@ export class ShardedWal<T extends object | string = object> {
570607
// Directory might not be empty or already removed, ignore
571608
}
572609
}
610+
611+
finalizeIfCoordinator(): void {
612+
if (this.#finalized) return;
613+
this.#finalized = true;
614+
615+
if (!this.isCoordinator()) return;
616+
617+
this.finalize();
618+
this.cleanup();
619+
}
573620
}

0 commit comments

Comments
 (0)