Skip to content

Commit 27e76f2

Browse files
committed
fix: adjust observer logic
1 parent cb61834 commit 27e76f2

File tree

6 files changed

+890
-143
lines changed

6 files changed

+890
-143
lines changed

packages/utils/src/lib/performance-observer.int.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ describe('PerformanceObserverSink', () => {
150150
it('should observe buffered performance entries when buffered is enabled', async () => {
151151
const observer = new PerformanceObserverSink({
152152
...options,
153-
buffered: true,
153+
captureBufferedEntries: true,
154154
});
155155

156156
performance.mark('test-mark-1');

packages/utils/src/lib/performance-observer.ts

Lines changed: 257 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,61 +1,217 @@
1-
import {
2-
type PerformanceEntry,
3-
PerformanceObserver,
4-
type PerformanceObserverEntryList,
5-
performance,
6-
} from 'node:perf_hooks';
7-
import type { Buffered, Encoder, Observer, Sink } from './sink-source.type';
1+
import { type PerformanceEntry, PerformanceObserver } from 'node:perf_hooks';
2+
import type { Buffered, Observer, Sink } from './sink-source.type';
83

94
/**
105
* Encoder that converts PerformanceEntry to domain events.
116
*
12-
* Pure function that transforms performance entries into one or more domain events.
7+
* Pure function that transforms performance entries into domain events.
138
* Should be stateless, synchronous, and have no side effects.
9+
* Returns a readonly array of encoded items.
1410
*/
1511
export type PerformanceEntryEncoder<F> = (
1612
entry: PerformanceEntry,
1713
) => readonly F[];
1814

15+
/**
16+
* Array of performance entry types that this observer monitors.
17+
* Only 'mark' and 'measure' entries are tracked as they represent
18+
* user-defined performance markers and measurements.
19+
*/
1920
const OBSERVED_TYPES = ['mark', 'measure'] as const;
2021
type ObservedEntryType = 'mark' | 'measure';
22+
const OBSERVED_TYPE_SET = new Set<ObservedEntryType>(OBSERVED_TYPES);
23+
24+
/**
25+
* Default threshold for triggering queue flushes based on queue length.
26+
* When the queue length reaches (maxQueueSize - flushThreshold),
27+
* a flush is triggered to prevent overflow. This provides a buffer zone
28+
* before hitting the maximum queue capacity.
29+
*/
2130
export const DEFAULT_FLUSH_THRESHOLD = 20;
2231

32+
/**
33+
* Default maximum number of items allowed in the queue before entries are dropped.
34+
* This acts as a memory safety limit to prevent unbounded memory growth
35+
* in case of sink slowdown or high-frequency performance entries.
36+
*/
37+
export const DEFAULT_MAX_QUEUE_SIZE = 10_000;
38+
39+
/**
40+
* Validates the flush threshold configuration to ensure sensible bounds.
41+
*
42+
* The flush threshold must be positive and cannot exceed the maximum queue size,
43+
* as it represents a buffer zone within the queue capacity.
44+
*
45+
* @param flushThreshold - The threshold value to validate (must be > 0)
46+
* @param maxQueueSize - The maximum queue size for comparison (flushThreshold <= maxQueueSize)
47+
* @throws {Error} If flushThreshold is not positive or exceeds maxQueueSize
48+
*/
49+
export function validateFlushThreshold(
50+
flushThreshold: number,
51+
maxQueueSize: number,
52+
): void {
53+
if (flushThreshold <= 0) {
54+
throw new Error('flushThreshold must be > 0');
55+
}
56+
if (flushThreshold > maxQueueSize) {
57+
throw new Error('flushThreshold must be <= maxQueueSize');
58+
}
59+
}
60+
61+
/**
62+
* Configuration options for the PerformanceObserverSink.
63+
*
64+
* @template T - The type of encoded performance data that will be written to the sink
65+
*/
2366
export type PerformanceObserverOptions<T> = {
67+
/**
68+
* The sink where encoded performance entries will be written.
69+
* Must implement the Sink interface for handling the encoded data.
70+
*/
2471
sink: Sink<T, unknown>;
72+
73+
/**
74+
* Function that encodes raw PerformanceEntry objects into domain-specific types.
75+
* This transformer converts Node.js performance entries into application-specific data structures.
76+
* Returns a readonly array of encoded items.
77+
*/
2578
encodePerfEntry: PerformanceEntryEncoder<T>;
26-
buffered?: boolean;
79+
80+
/**
81+
* Whether to enable buffered observation mode.
82+
* When true, captures all performance entries that occurred before observation started.
83+
* When false, only captures entries after subscription begins.
84+
*
85+
* @default true
86+
*/
87+
captureBufferedEntries?: boolean;
88+
89+
/**
90+
* Threshold for triggering queue flushes based on queue length.
91+
* Flushes occur when queue length reaches (maxQueueSize - flushThreshold).
92+
* Larger values provide more buffer space before hitting capacity limits.
93+
*
94+
* @default DEFAULT_FLUSH_THRESHOLD (20)
95+
*/
2796
flushThreshold?: number;
97+
98+
/**
99+
* Maximum number of items allowed in the queue before new entries are dropped.
100+
* Acts as a memory safety limit to prevent unbounded growth during sink slowdown.
101+
*
102+
* @default DEFAULT_MAX_QUEUE_SIZE (10000)
103+
*/
104+
maxQueueSize?: number;
28105
};
29106

30-
export class PerformanceObserverSink<T>
31-
implements Observer, Buffered, Encoder<PerformanceEntry, readonly T[]>
32-
{
107+
/**
108+
* A sink implementation that observes Node.js performance entries and forwards them to a configurable sink.
109+
*
110+
* This class provides a buffered, memory-safe bridge between Node.js PerformanceObserver
111+
* and application-specific data sinks. It handles performance entry encoding, queue management,
112+
* and graceful degradation under high load conditions.
113+
*
114+
* @template T - The type of encoded performance data written to the sink
115+
* @implements {Observer} - Lifecycle management interface
116+
* @implements {Buffered} - Queue statistics interface
117+
*/
118+
export class PerformanceObserverSink<T> implements Observer, Buffered {
119+
/** Encoder function for transforming PerformanceEntry objects into domain types */
33120
#encodePerfEntry: PerformanceEntryEncoder<T>;
121+
122+
/** Whether buffered observation mode is enabled */
34123
#buffered: boolean;
124+
125+
/** Threshold for triggering flushes based on queue length proximity to max capacity */
35126
#flushThreshold: number;
127+
128+
/** Maximum number of items allowed in queue before dropping new entries (hard memory limit) */
129+
#maxQueueSize: number;
130+
131+
/** The target sink where encoded performance data is written */
36132
#sink: Sink<T, unknown>;
133+
134+
/** Node.js PerformanceObserver instance, undefined when not subscribed */
37135
#observer: PerformanceObserver | undefined;
38136

39-
#pendingCount = 0;
137+
/** Bounded queue storing encoded performance items awaiting flush */
138+
#queue: T[] = [];
139+
140+
/** Count of performance entries dropped due to queue overflow */
141+
#dropped = 0;
142+
143+
/** Count of performance entries successfully written to sink */
144+
#written = 0;
40145

41-
// "cursor" per type: how many we already wrote from the global buffer
42-
#written: Map<ObservedEntryType, number>;
146+
/** Number of items added to queue since last successful flush */
147+
#addedSinceLastFlush = 0;
43148

149+
/**
150+
* Creates a new PerformanceObserverSink with the specified configuration.
151+
*
152+
* @param options - Configuration options for the performance observer sink
153+
* @throws {Error} If flushThreshold validation fails (must be > 0 and <= maxQueueSize)
154+
*/
44155
constructor(options: PerformanceObserverOptions<T>) {
45-
const { encodePerfEntry, sink, buffered, flushThreshold } = options;
156+
const {
157+
encodePerfEntry,
158+
sink,
159+
captureBufferedEntries,
160+
flushThreshold = DEFAULT_FLUSH_THRESHOLD,
161+
maxQueueSize = DEFAULT_MAX_QUEUE_SIZE,
162+
} = options;
46163
this.#encodePerfEntry = encodePerfEntry;
47-
this.#written = new Map<ObservedEntryType, number>(
48-
OBSERVED_TYPES.map(t => [t, 0]),
49-
);
50164
this.#sink = sink;
51-
this.#buffered = buffered ?? false;
52-
this.#flushThreshold = flushThreshold ?? DEFAULT_FLUSH_THRESHOLD;
165+
this.#buffered = captureBufferedEntries ?? true;
166+
this.#maxQueueSize = maxQueueSize;
167+
validateFlushThreshold(flushThreshold, this.#maxQueueSize);
168+
this.#flushThreshold = flushThreshold;
53169
}
54170

171+
/**
172+
* Returns current queue statistics for monitoring and debugging.
173+
*
174+
* Provides insight into the current state of the performance entry queue,
175+
* useful for monitoring memory usage and processing throughput.
176+
*
177+
* @returns Object containing all states and entry counts
178+
*/
179+
getStats() {
180+
return {
181+
isSubscribed: this.isSubscribed(),
182+
queued: this.#queue.length,
183+
dropped: this.#dropped,
184+
written: this.#written,
185+
maxQueueSize: this.#maxQueueSize,
186+
flushThreshold: this.#flushThreshold,
187+
addedSinceLastFlush: this.#addedSinceLastFlush,
188+
buffered: this.#buffered,
189+
};
190+
}
191+
192+
/**
193+
* Encodes a raw PerformanceEntry using the configured encoder function.
194+
*
195+
* This method delegates to the user-provided encoder function, allowing
196+
* transformation of Node.js performance entries into application-specific types.
197+
*
198+
* @param entry - The raw performance entry to encode
199+
* @returns Readonly array of encoded items
200+
*/
55201
encode(entry: PerformanceEntry): readonly T[] {
56202
return this.#encodePerfEntry(entry);
57203
}
58204

205+
/**
206+
* Starts observing performance entries and forwarding them to the sink.
207+
*
208+
* Creates a Node.js PerformanceObserver that monitors 'mark' and 'measure' entries.
209+
* The observer uses a bounded queue with proactive flushing to manage memory usage.
210+
* When buffered mode is enabled, any existing buffered entries are immediately flushed.
211+
*
212+
* @throws {Error} If the sink is closed before subscription
213+
*
214+
*/
59215
subscribe(): void {
60216
if (this.#observer) {
61217
return;
@@ -66,66 +222,110 @@ export class PerformanceObserverSink<T>
66222
);
67223
}
68224

69-
// Only used to trigger the flush - it's not processing the entries, just counting them
70-
this.#observer = new PerformanceObserver(
71-
(list: PerformanceObserverEntryList) => {
72-
const batchCount = OBSERVED_TYPES.reduce(
73-
(n, t) => n + list.getEntriesByType(t).length,
74-
0,
75-
);
76-
77-
this.#pendingCount += batchCount;
78-
if (this.#pendingCount >= this.#flushThreshold) {
79-
this.flush();
225+
this.#observer = new PerformanceObserver(list => {
226+
if (this.#sink.isClosed()) {
227+
this.#queue.length = 0;
228+
return;
229+
}
230+
231+
list.getEntries().forEach(entry => {
232+
if (OBSERVED_TYPE_SET.has(entry.entryType as ObservedEntryType)) {
233+
const items = this.encode(entry);
234+
items.forEach(item => {
235+
if (this.#queue.length >= this.#maxQueueSize) {
236+
this.#dropped++;
237+
return;
238+
}
239+
240+
if (
241+
this.#queue.length >=
242+
this.#maxQueueSize - this.#flushThreshold
243+
) {
244+
this.flush();
245+
}
246+
this.#queue.push(item);
247+
this.#addedSinceLastFlush++;
248+
});
80249
}
81-
},
82-
);
250+
});
251+
252+
if (this.#addedSinceLastFlush >= this.#flushThreshold) {
253+
this.flush();
254+
}
255+
});
83256

84257
this.#observer.observe({
85258
entryTypes: OBSERVED_TYPES,
86259
buffered: this.#buffered,
87260
});
261+
262+
if (this.#buffered) {
263+
this.flush();
264+
}
88265
}
89266

267+
/**
268+
* Flushes all queued performance entries to the sink.
269+
*
270+
* Writes all currently queued encoded performance entries to the configured sink.
271+
* If the sink is closed during flush, the queue is cleared without writing.
272+
* The queue is always cleared after flush attempt, regardless of success or failure.
273+
*
274+
* @throws {Error} If sink write operations fail (with original error as cause)
275+
*/
90276
flush(): void {
91-
if (!this.#observer) {
277+
if (this.#queue.length === 0) {
92278
return;
93279
}
94280
if (this.#sink.isClosed()) {
281+
// clear queue and drop items when sink closes unexpectedly
282+
this.#queue.length = 0;
283+
return;
284+
}
285+
286+
try {
287+
this.#queue.forEach(item => {
288+
this.#sink.write(item);
289+
this.#written++;
290+
});
291+
} catch (error) {
95292
throw new Error(
96-
`Sink ${this.#sink.constructor.name} must be opened before subscribing PerformanceObserver`,
293+
'PerformanceObserverSink failed to write items to sink.',
294+
{ cause: error },
97295
);
296+
} finally {
297+
this.#queue.length = 0;
298+
this.#addedSinceLastFlush = 0;
98299
}
99-
100-
OBSERVED_TYPES.forEach(t => {
101-
const written = this.#written.get(t) ?? 0;
102-
const fresh = performance.getEntriesByType(t).slice(written);
103-
104-
try {
105-
fresh
106-
.flatMap(entry => this.encode(entry))
107-
.forEach(item => this.#sink.write(item));
108-
109-
this.#written.set(t, written + fresh.length);
110-
} catch (error) {
111-
throw new Error(
112-
'PerformanceObserverSink failed to write items to sink.',
113-
{ cause: error },
114-
);
115-
}
116-
});
117-
118-
this.#pendingCount = 0;
119300
}
120301

302+
/**
303+
* Stops observing performance entries and cleans up resources.
304+
*
305+
* Performs a final flush of any remaining queued entries, then disconnects
306+
* the PerformanceObserver and releases all references.
307+
*
308+
* This method is idempotent - safe to call multiple times.
309+
*/
121310
unsubscribe(): void {
122311
if (!this.#observer) {
123312
return;
124313
}
125-
this.#observer?.disconnect();
314+
this.flush();
315+
this.#queue.length = 0;
316+
this.#addedSinceLastFlush = 0;
317+
this.#observer.disconnect();
126318
this.#observer = undefined;
127319
}
128320

321+
/**
322+
* Checks whether the performance observer is currently active.
323+
*
324+
* Returns true if the sink is subscribed and actively observing performance entries.
325+
* This indicates that a PerformanceObserver instance exists and is connected.
326+
*
327+
* @returns true if currently subscribed and observing, false otherwise
328+
*/
129329
isSubscribed(): boolean {
130330
return this.#observer !== undefined;
131331
}

0 commit comments

Comments
 (0)