Skip to content

Commit e20e804

Browse files
committed
refactor: wip
1 parent eb559f5 commit e20e804

File tree

4 files changed

+340
-109
lines changed

4 files changed

+340
-109
lines changed

packages/utils/src/lib/performance-observer.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ import {
44
type PerformanceObserverEntryList,
55
performance,
66
} from 'node:perf_hooks';
7-
import type { WriteAheadLogFile } from './wal.js';
7+
import type { AppendableSink } from './wal.js';
88

99
const OBSERVED_TYPES = ['mark', 'measure'] as const;
1010
type ObservedEntryType = 'mark' | 'measure';
1111
export const DEFAULT_FLUSH_THRESHOLD = 20;
1212

1313
export type PerformanceObserverOptions<T> = {
14-
sink: WriteAheadLogFile<T>;
14+
sink: AppendableSink<T>;
1515
encode: (entry: PerformanceEntry) => T[];
1616
buffered?: boolean;
1717
flushThreshold?: number;
@@ -21,7 +21,7 @@ export class PerformanceObserverSink<T> {
2121
#encode: (entry: PerformanceEntry) => T[];
2222
#buffered: boolean;
2323
#flushThreshold: number;
24-
#sink: WriteAheadLogFile<T>;
24+
#sink: AppendableSink<T>;
2525
#observer: PerformanceObserver | undefined;
2626

2727
#pendingCount = 0;

packages/utils/src/lib/performance-observer.unit.test.ts

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,6 @@ describe('PerformanceObserverSink', () => {
268268
codec: failingCodec,
269269
});
270270

271-
// Mock the append method to throw
272271
vi.spyOn(failingSink, 'append').mockImplementation(() => {
273272
throw new Error('Sink write failed');
274273
});
@@ -317,4 +316,26 @@ describe('PerformanceObserverSink', () => {
317316
}),
318317
);
319318
});
319+
320+
it('accepts custom sinks with append method', () => {
321+
// Create a simple in-memory sink that just collects items
322+
const collectedItems: string[] = [];
323+
const customSink = {
324+
append: (item: string) => collectedItems.push(item),
325+
};
326+
327+
const observer = new PerformanceObserverSink({
328+
sink: customSink,
329+
encode: (entry: PerformanceEntry) => [`${entry.name}:${entry.duration}`],
330+
});
331+
332+
observer.subscribe();
333+
334+
const mockObserver = MockPerformanceObserver.lastInstance();
335+
mockObserver?.emitMark('test-mark');
336+
337+
observer.flush();
338+
339+
expect(collectedItems).toContain('test-mark:0');
340+
});
320341
});

packages/utils/src/lib/wal.ts

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,14 @@ export type Codec<I, O = string> = {
1515

1616
export type InvalidEntry<O = string> = { __invalid: true; raw: O };
1717

18+
/**
19+
* Interface for sinks that can append items.
20+
* Allows for different types of appendable storage (WAL, in-memory, etc.)
21+
*/
22+
export interface AppendableSink<T> {
23+
append: (item: T) => void;
24+
}
25+
1826
/**
1927
* Result of recovering records from a WAL file.
2028
* Contains successfully recovered records and any errors encountered during parsing.
@@ -28,32 +36,20 @@ export type RecoverResult<T> = {
2836
partialTail: string | null;
2937
};
3038

31-
export const createTolerantCodec = <I, O = string>(
32-
codecOrEncode: ((v: I) => O) | { encode: (v: I) => O; decode: (d: O) => I },
33-
decode?: (d: O) => I,
34-
): Codec<I | InvalidEntry<O>, O> => {
35-
if (typeof codecOrEncode === 'function' && !decode) {
36-
throw new Error(
37-
'decode function must be provided when codecOrEncode is a function',
38-
);
39-
}
40-
41-
const encodeFn =
42-
typeof codecOrEncode === 'function' ? codecOrEncode : codecOrEncode.encode;
43-
44-
const decodeFn =
45-
typeof codecOrEncode === 'function'
46-
? (decode as (d: O) => I)
47-
: codecOrEncode.decode;
39+
export const createTolerantCodec = <I, O = string>(codec: {
40+
encode: (v: I) => O;
41+
decode: (d: O) => I;
42+
}): Codec<I | InvalidEntry<O>, O> => {
43+
const { encode, decode } = codec;
4844

4945
return {
5046
encode: v =>
5147
v && typeof v === 'object' && '__invalid' in v
5248
? (v as InvalidEntry<O>).raw
53-
: encodeFn(v as I),
49+
: encode(v as I),
5450
decode: d => {
5551
try {
56-
return decodeFn(d);
52+
return decode(d);
5753
} catch {
5854
return { __invalid: true, raw: d };
5955
}
@@ -118,7 +114,7 @@ export function recoverFromContent<T>(
118114
* Write-Ahead Log implementation for crash-safe append-only logging.
119115
* Provides atomic operations for writing, recovering, and repacking log entries.
120116
*/
121-
export class WriteAheadLogFile<T> {
117+
export class WriteAheadLogFile<T> implements AppendableSink<T> {
122118
#fd: number | null = null;
123119
readonly #file: string;
124120
readonly #decode: Codec<T | InvalidEntry<string>>['decode'];
@@ -135,11 +131,6 @@ export class WriteAheadLogFile<T> {
135131
this.#encode = c.encode;
136132
}
137133

138-
/** Get the file path for this WAL */
139-
get path() {
140-
return this.#file;
141-
}
142-
143134
/** Get the file path for this WAL */
144135
getPath = () => this.#file;
145136

@@ -198,13 +189,16 @@ export class WriteAheadLogFile<T> {
198189
this.close();
199190
const r = this.recover();
200191
if (r.errors.length > 0) {
201-
// Log repack failure - could add proper logging here
192+
console.log('WAL repack encountered decode errors');
202193
}
203194

204195
// Check if any records are invalid entries (from tolerant codec)
205196
const hasInvalidEntries = r.records.some(
206197
rec => typeof rec === 'object' && rec != null && '__invalid' in rec,
207198
);
199+
if (hasInvalidEntries) {
200+
console.log('Found invalid entries during WAL repack');
201+
}
208202
const recordsToWrite = hasInvalidEntries
209203
? r.records
210204
: filterValidRecords(r.records);

0 commit comments

Comments
 (0)