Skip to content

Commit a530134

Browse files
committed
refactor: wip
1 parent bc86d08 commit a530134

File tree

6 files changed

+382
-203
lines changed

6 files changed

+382
-203
lines changed

packages/utils/src/lib/clock-epoch.unit.test.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { defaultClock, epochClock } from './clock-epoch.js';
44
describe('epochClock', () => {
55
it('should create epoch clock with defaults', () => {
66
const c = epochClock();
7-
expect(c.timeOriginMs).toBe(500_000);
7+
expect(c.timeOriginMs).toBe(1_700_000_000_000);
88
expect(c.tid).toBe(2);
99
expect(c.pid).toBe(10_001);
1010
expect(c.fromEpochMs).toBeFunction();
@@ -33,8 +33,8 @@ describe('epochClock', () => {
3333

3434
it('should support performance clock by default for epochNowUs', () => {
3535
const c = epochClock();
36-
expect(c.timeOriginMs).toBe(500_000);
37-
expect(c.epochNowUs()).toBe(500_000_000);
36+
expect(c.timeOriginMs).toBe(1_700_000_000_000);
37+
expect(c.epochNowUs()).toBe(1_700_000_000_000_000);
3838
});
3939

4040
it.each([
@@ -56,8 +56,8 @@ describe('epochClock', () => {
5656
});
5757

5858
it.each([
59-
[0, 500_000_000],
60-
[1000, 501_000_000],
59+
[0, 1_700_000_000_000_000],
60+
[1000, 1_700_000_001_000_000],
6161
])(
6262
'should convert performance milliseconds to microseconds',
6363
(perfMs, expected) => {

packages/utils/src/lib/profiler/profiler.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import process from 'node:process';
2+
import { threadId } from 'node:worker_threads';
23
import { isEnvVarEnabled } from '../env.js';
34
import {
45
type ActionTrackConfigs,
@@ -16,6 +17,14 @@ import type {
1617
} from '../user-timing-extensibility-api.type.js';
1718
import { PROFILER_ENABLED_ENV_VAR } from './constants.js';
1819

20+
/**
21+
* Generates a unique profiler ID based on performance time origin, process ID, thread ID, and instance count.
22+
*/
23+
export function getProfilerId() {
24+
// eslint-disable-next-line functional/immutable-data
25+
return `${Math.round(performance.timeOrigin)}.${process.pid}.${threadId}.${++Profiler.instanceCount}`;
26+
}
27+
1928
/**
2029
* Configuration options for creating a Profiler instance.
2130
*
@@ -59,6 +68,8 @@ export type ProfilerOptions<T extends ActionTrackConfigs = ActionTrackConfigs> =
5968
*
6069
*/
6170
export class Profiler<T extends ActionTrackConfigs> {
71+
static instanceCount = 0;
72+
readonly id = getProfilerId();
6273
#enabled: boolean;
6374
readonly #defaults: ActionTrackEntryPayload;
6475
readonly tracks: Record<keyof T, ActionTrackEntryPayload> | undefined;

packages/utils/src/lib/profiler/profiler.unit.test.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,19 @@
11
import { performance } from 'node:perf_hooks';
2+
import { threadId } from 'node:worker_threads';
23
import { beforeEach, describe, expect, it, vi } from 'vitest';
34
import type { ActionTrackEntryPayload } from '../user-timing-extensibility-api.type.js';
4-
import { Profiler, type ProfilerOptions } from './profiler.js';
5+
import { Profiler, type ProfilerOptions, getProfilerId } from './profiler.js';
6+
7+
describe('getProfilerId', () => {
8+
it('should generate a unique id per process', () => {
9+
expect(getProfilerId()).toBe(
10+
`${Math.round(performance.timeOrigin)}.${process.pid}.${threadId}.1`,
11+
);
12+
expect(getProfilerId()).toBe(
13+
`${Math.round(performance.timeOrigin)}.${process.pid}.${threadId}.2`,
14+
);
15+
});
16+
});
517

618
describe('Profiler', () => {
719
const getProfiler = (overrides?: Partial<ProfilerOptions>) =>

packages/utils/src/lib/wal.ts

Lines changed: 134 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
/* eslint-disable max-lines */
12
import * as fs from 'node:fs';
23
import path from 'node:path';
34
import process from 'node:process';
5+
import { threadId } from 'node:worker_threads';
46

57
/**
68
* Codec for encoding/decoding values to/from strings for WAL storage.
@@ -222,10 +224,6 @@ export type WalFormat<T extends object | string> = {
222224
finalExtension: string;
223225
/** Codec for encoding/decoding records */
224226
codec: Codec<T, string>;
225-
/** Function to generate shard file paths */
226-
shardPath: (id: string) => string;
227-
/** Function to generate final merged file path */
228-
finalPath: () => string;
229227
/** Finalizer for converting records to a string */
230228
finalizer: (
231229
records: (T | InvalidEntry<string>)[],
@@ -253,8 +251,6 @@ export const stringCodec = <
253251
* - walExtension defaults to '.log'
254252
* - finalExtension defaults to '.log'
255253
* - codec defaults to stringCodec<T>()
256-
* - shardPath defaults to (id: string) => `${baseName}.${id}${walExtension}`
257-
* - finalPath defaults to () => `${baseName}${finalExtension}`
258254
* - finalizer defaults to (encodedRecords: (T | InvalidEntry<string>)[]) => `${encodedRecords.join('\n')}\n`
259255
* @param format - Partial WalFormat configuration
260256
* @returns Parsed WalFormat with defaults filled in
@@ -263,12 +259,10 @@ export function parseWalFormat<T extends object | string = object>(
263259
format: Partial<WalFormat<T>>,
264260
): WalFormat<T> {
265261
const {
266-
baseName = Date.now().toString(),
262+
baseName = 'trace',
267263
walExtension = '.log',
268264
finalExtension = walExtension,
269265
codec = stringCodec<T>(),
270-
shardPath = (id: string) => `${baseName}.${id}${walExtension}`,
271-
finalPath = () => `${baseName}${finalExtension}`,
272266
finalizer = (encodedRecords: (T | InvalidEntry<string>)[]) =>
273267
`${encodedRecords.join('\n')}\n`,
274268
} = format;
@@ -278,8 +272,6 @@ export function parseWalFormat<T extends object | string = object>(
278272
walExtension,
279273
finalExtension,
280274
codec,
281-
shardPath,
282-
finalPath,
283275
finalizer,
284276
} satisfies WalFormat<T>;
285277
}
@@ -292,42 +284,147 @@ export function parseWalFormat<T extends object | string = object>(
292284
*
293285
* @returns true if this is the leader WAL process, false otherwise
294286
*/
295-
export function isLeaderWal(envVarName: string): boolean {
296-
return process.env[envVarName] === String(process.pid);
287+
export function isLeaderWal(envVarName: string, profilerID: string): boolean {
288+
return process.env[envVarName] === profilerID;
297289
}
298290

299291
/**
300292
* Initialize the origin PID environment variable if not already set.
301293
* This must be done as early as possible before any user code runs.
302-
* Set's PROFILER_ORIGIN_PID_ENV_VAR to the current process PID if not already defined.
294+
* Set's envVarName to the current process PID if not already defined.
303295
*/
304-
export function setLeaderWal(PROFILER_ORIGIN_PID_ENV_VAR: string): void {
305-
if (!process.env[PROFILER_ORIGIN_PID_ENV_VAR]) {
296+
export function setLeaderWal(envVarName: string, profilerID: string): void {
297+
if (!process.env[envVarName]) {
306298
// eslint-disable-next-line functional/immutable-data
307-
process.env[PROFILER_ORIGIN_PID_ENV_VAR] = String(process.pid);
299+
process.env[envVarName] = profilerID;
308300
}
309301
}
310302

303+
// eslint-disable-next-line functional/no-let
304+
let shardCount = 0;
305+
/**
306+
* Generates a human-readable shard ID.
307+
* This ID is unique per process/thread/shard combination and used in the file name.
308+
* Format: readable-timestamp.pid.threadId.shardCount
309+
* Example: "20240101-120000-000.12345.1.1"
310+
* Becomes file: trace.20240101-120000-000.12345.1.1.log
311+
*/
312+
export function getShardId(): string {
313+
const timestamp = Math.round(performance.timeOrigin + performance.now());
314+
const readableTimestamp = soratebleReadableDateString(`${timestamp}`);
315+
return `${readableTimestamp}.${process.pid}.${threadId}.${++shardCount}`;
316+
}
317+
318+
/**
319+
* Generates a human-readable sharded group ID.
320+
* This ID is a globally unique, sortable, human-readable date string per run.
321+
* Used directly as the folder name to group shards.
322+
* Format: yyyymmdd-hhmmss-ms
323+
* Example: "20240101-120000-000"
324+
*/
325+
export function getShardedGroupId(): string {
326+
return soratebleReadableDateString(
327+
`${Math.round(performance.timeOrigin + performance.now())}`,
328+
);
329+
}
330+
331+
/**
332+
* Regex patterns for validating WAL ID formats
333+
*/
334+
export const WAL_ID_PATTERNS = {
335+
/** Readable date format: yyyymmdd-hhmmss-ms */
336+
READABLE_DATE: /^\d{8}-\d{6}-\d{3}$/,
337+
/** Group ID format: yyyymmdd-hhmmss-ms */
338+
GROUP_ID: /^\d{8}-\d{6}-\d{3}$/,
339+
/** Shard ID format: readable-date.pid.threadId.count */
340+
SHARD_ID: /^\d{8}-\d{6}-\d{3}(?:\.\d+){3}$/,
341+
} as const;
342+
343+
export function soratebleReadableDateString(timestampMs: string): string {
344+
const timestamp = Number.parseInt(timestampMs, 10);
345+
const date = new Date(timestamp);
346+
const MILLISECONDS_PER_SECOND = 1000;
347+
const yyyy = date.getFullYear();
348+
const mm = String(date.getMonth() + 1).padStart(2, '0');
349+
const dd = String(date.getDate()).padStart(2, '0');
350+
const hh = String(date.getHours()).padStart(2, '0');
351+
const min = String(date.getMinutes()).padStart(2, '0');
352+
const ss = String(date.getSeconds()).padStart(2, '0');
353+
// eslint-disable-next-line @typescript-eslint/no-magic-numbers
354+
const ms = String(timestamp % MILLISECONDS_PER_SECOND).padStart(3, '0');
355+
356+
return `${yyyy}${mm}${dd}-${hh}${min}${ss}-${ms}`;
357+
}
358+
/**
359+
* Generates a path to a shard file using human-readable IDs.
360+
* Both groupId and shardId are already in readable date format.
361+
*
362+
* Example with groupId "20240101-120000-000" and shardId "20240101-120000-000.12345.1.1":
363+
* Full path: /base/20240101-120000-000/trace.20240101-120000-000.12345.1.1.log
364+
*
365+
* @param dir - The directory to store the shard file
366+
* @param format - The WalFormat to use for the shard file
367+
* @param groupId - The human-readable group ID (yyyymmdd-hhmmss-ms format)
368+
* @param shardId - The human-readable shard ID (readable-timestamp.pid.threadId.count format)
369+
* @returns The path to the shard file
370+
*/
371+
export function getShardedPath<T extends object | string = object>(opt: {
372+
dir?: string;
373+
format: WalFormat<T>;
374+
groupId: string;
375+
shardId: string;
376+
}): string {
377+
const { dir = '', format, groupId, shardId } = opt;
378+
const { baseName, walExtension } = format;
379+
380+
return path.join(dir, groupId, `${baseName}.${shardId}${walExtension}`);
381+
}
382+
383+
export function getShardedFinalPath<T extends object | string = object>(opt: {
384+
dir?: string;
385+
format: WalFormat<T>;
386+
groupId: string;
387+
}): string {
388+
const { dir = '', format, groupId } = opt;
389+
const { baseName, finalExtension } = format;
390+
391+
return path.join(dir, groupId, `${baseName}.${groupId}${finalExtension}`);
392+
}
393+
311394
/**
312395
* Sharded Write-Ahead Log manager for coordinating multiple WAL shards.
313396
* Handles distributed logging across multiple processes/files with atomic finalization.
314397
*/
315398

316399
export class ShardedWal<T extends object | string = object> {
400+
readonly groupId = getShardedGroupId();
317401
readonly #format: WalFormat<T>;
318-
readonly #dir: string;
402+
readonly #dir: string = process.cwd();
319403

320404
/**
321405
* Create a sharded WAL manager.
322406
*/
323-
constructor(dir: string, format: Partial<WalFormat<T>>) {
324-
this.#dir = dir;
407+
constructor(opt: {
408+
dir?: string;
409+
format: Partial<WalFormat<T>>;
410+
groupId?: string;
411+
}) {
412+
const { dir, format, groupId } = opt;
413+
this.groupId = groupId ?? getShardedGroupId();
414+
if (dir) {
415+
this.#dir = dir;
416+
}
325417
this.#format = parseWalFormat<T>(format);
326418
}
327419

328-
shard(id: string) {
420+
shard(shardId: string = getShardId()) {
329421
return new WriteAheadLogFile({
330-
file: path.join(this.#dir, this.#format.shardPath(id)),
422+
file: getShardedPath({
423+
dir: this.#dir,
424+
format: this.#format,
425+
groupId: this.groupId,
426+
shardId,
427+
}),
331428
codec: this.#format.codec,
332429
});
333430
}
@@ -338,10 +435,18 @@ export class ShardedWal<T extends object | string = object> {
338435
return [];
339436
}
340437

438+
const groupIdDir = path.dirname(
439+
getShardedFinalPath({
440+
dir: this.#dir,
441+
format: this.#format,
442+
groupId: this.groupId,
443+
}),
444+
);
341445
return fs
342-
.readdirSync(this.#dir)
446+
.readdirSync(groupIdDir)
343447
.filter(entry => entry.endsWith(this.#format.walExtension))
344-
.map(entry => path.join(this.#dir, entry));
448+
.filter(entry => entry.startsWith(`${this.#format.baseName}`))
449+
.map(entry => path.join(groupIdDir, entry));
345450
}
346451

347452
/**
@@ -368,7 +473,11 @@ export class ShardedWal<T extends object | string = object> {
368473
const recordsToFinalize = hasInvalidEntries
369474
? records
370475
: filterValidRecords(records);
371-
const out = path.join(this.#dir, this.#format.finalPath());
476+
const out = getShardedFinalPath({
477+
dir: this.#dir,
478+
format: this.#format,
479+
groupId: this.groupId,
480+
});
372481
fs.mkdirSync(path.dirname(out), {
373482
recursive: true,
374483
});
@@ -389,19 +498,3 @@ export class ShardedWal<T extends object | string = object> {
389498
});
390499
}
391500
}
392-
393-
/**
394-
* Generates a shard ID.
395-
* This is idempotent since PID and TID are fixed for the process/thread.
396-
*/
397-
export function getShardId(pid: number, tid: number = 0): string {
398-
return `${pid}-${tid}`;
399-
}
400-
401-
/**
402-
* Generates a sharded group ID based on performance.timeOrigin.
403-
* This is idempotent per process since timeOrigin is fixed within a process and its worker.
404-
*/
405-
export function getShardedGroupId(): string {
406-
return Math.floor(performance.timeOrigin).toString();
407-
}

0 commit comments

Comments
 (0)