Skip to content

Commit fac7961

Browse files
committed
Add streaming jsonl parser
1 parent 9a0bff6 commit fac7961

File tree

1 file changed

+41
-13
lines changed

1 file changed

+41
-13
lines changed
Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,54 @@
1-
import { readFile } from "fs-extra";
1+
import { stat } from "fs/promises";
2+
import { createReadStream } from "fs-extra";
3+
4+
const doubleLineBreakRegexp = /\n\r?\n/;
25

36
/**
47
* Read a file consisting of multiple JSON objects. Each object is separated from the previous one
58
* by a double newline sequence. This is basically a more human-readable form of JSONL.
69
*
7-
* The current implementation reads the entire text of the document into memory, but in the future
8-
* it will stream the document to improve the performance with large documents.
9-
*
1010
* @param path The path to the file.
1111
* @param handler Callback to be invoked for each top-level JSON object in order.
1212
*/
1313
export async function readJsonlFile<T>(
1414
path: string,
1515
handler: (value: T) => Promise<void>,
16+
logger?: { log: (message: string) => void },
1617
): Promise<void> {
17-
const logSummary = await readFile(path, "utf-8");
18-
19-
// Remove newline delimiters because summary is in .jsonl format.
20-
const jsonSummaryObjects: string[] = logSummary.split(/\r?\n\r?\n/g);
21-
22-
for (const obj of jsonSummaryObjects) {
23-
const jsonObj = JSON.parse(obj) as T;
24-
await handler(jsonObj);
25-
}
18+
void logger?.log(
19+
`Parsing ${path} (${(await stat(path)).size / 1024 / 1024} MB)...`,
20+
);
21+
return new Promise((resolve, reject) => {
22+
const stream = createReadStream(path, { encoding: "utf8" });
23+
let buffer = "";
24+
stream.on("data", async (chunk: string) => {
25+
const parts = (buffer + chunk).split(doubleLineBreakRegexp);
26+
buffer = parts.pop()!;
27+
if (parts.length > 0) {
28+
try {
29+
stream.pause();
30+
for (const part of parts) {
31+
await handler(JSON.parse(part));
32+
}
33+
stream.resume();
34+
} catch (e) {
35+
stream.destroy();
36+
reject(e);
37+
}
38+
}
39+
});
40+
stream.on("end", async () => {
41+
if (buffer.trim().length > 0) {
42+
try {
43+
await handler(JSON.parse(buffer));
44+
} catch (e) {
45+
reject(e);
46+
return;
47+
}
48+
}
49+
void logger?.log(`Finishing parsing ${path}`);
50+
resolve();
51+
});
52+
stream.on("error", reject);
53+
});
2654
}

0 commit comments

Comments
 (0)