Skip to content

Commit 0e3f706

Browse files
authored
Merge pull request #42 from sergeyzenchenko/master
Add array stream decoding
2 parents 0972f1a + 50011dd commit 0e3f706

File tree

4 files changed

+104
-15
lines changed

4 files changed

+104
-15
lines changed

benchmark/sync-vs-async.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#!ts-node
22
/* eslint-disable no-console */
33

4-
import { encode, decode, decodeAsync } from "../src";
4+
import { encode, decode, decodeAsync, decodeArrayStream } from "../src";
55
import { writeFileSync, unlinkSync, readFileSync, createReadStream } from "fs";
66
import { deepStrictEqual } from "assert";
77

@@ -39,4 +39,14 @@ import { deepStrictEqual } from "assert";
3939
await decodeAsync(createReadStream(file));
4040
}
4141
console.timeEnd("creteReadStream |> decodeAsync");
42+
43+
// asyncArrayStream
44+
45+
console.time("creteReadStream |> decodeArrayStream");
46+
for (let i = 0; i < 100; i++) {
47+
for await (let result of decodeArrayStream(createReadStream(file))) {
48+
// console.log(result);
49+
}
50+
}
51+
console.timeEnd("creteReadStream |> decodeArrayStream");
4252
})();

src/Decoder.ts

Lines changed: 74 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,20 @@ export class Decoder {
7474
this.pos = 0;
7575
}
7676

77+
appendBuffer(buffer: Uint8Array | ArrayLike<number>) {
78+
if (this.headByte === HEAD_BYTE_REQUIRED && !this.hasRemaining()) {
79+
this.setBuffer(buffer);
80+
} else {
81+
// retried because data is insufficient
82+
const remainingData = this.bytes.subarray(this.pos);
83+
const newData = ensureUint8Array(buffer);
84+
const concated = new Uint8Array(remainingData.length + newData.length);
85+
concated.set(remainingData);
86+
concated.set(newData, remainingData.length);
87+
this.setBuffer(concated);
88+
}
89+
}
90+
7791
hasRemaining(size = 1) {
7892
return this.view.byteLength - this.pos >= size;
7993
}
@@ -99,18 +113,7 @@ export class Decoder {
99113
throw this.createNoExtraBytesError(this.totalPos);
100114
}
101115

102-
if (this.headByte === HEAD_BYTE_REQUIRED && !this.hasRemaining()) {
103-
this.setBuffer(buffer);
104-
} else {
105-
// retried because data is insufficient
106-
const remainingData = this.bytes.subarray(this.pos);
107-
const newData = ensureUint8Array(buffer);
108-
const concated = new Uint8Array(remainingData.length + newData.length);
109-
concated.set(remainingData);
110-
concated.set(newData, remainingData.length);
111-
this.setBuffer(concated);
112-
}
113-
//console.log("view", this.view, this.headByte);
116+
this.appendBuffer(buffer);
114117

115118
try {
116119
object = this.decodeSync();
@@ -137,6 +140,47 @@ export class Decoder {
137140
);
138141
}
139142

143+
async *decodeArrayStream(stream: AsyncIterable<ArrayLike<number> | Uint8Array>) {
144+
let headerParsed = false;
145+
let decoded = false;
146+
let itemsLeft = 0;
147+
148+
for await (const buffer of stream) {
149+
if (decoded) {
150+
throw this.createNoExtraBytesError(this.totalPos);
151+
}
152+
153+
this.appendBuffer(buffer);
154+
155+
if (!headerParsed) {
156+
itemsLeft = this.readArraySize();
157+
headerParsed = true;
158+
this.complete();
159+
}
160+
161+
try {
162+
while (true) {
163+
let result = this.decodeSync();
164+
165+
yield result;
166+
167+
itemsLeft--;
168+
169+
if (itemsLeft === 0) {
170+
decoded = true;
171+
break;
172+
}
173+
}
174+
} catch (e) {
175+
if (!(e instanceof DataViewIndexOutOfBoundsError)) {
176+
throw e; // rethrow
177+
}
178+
// fallthrough
179+
}
180+
this.totalPos += this.pos;
181+
}
182+
}
183+
140184
decodeSync(): unknown {
141185
DECODE: while (true) {
142186
const headByte = this.readHeadByte();
@@ -363,6 +407,24 @@ export class Decoder {
363407
this.headByte = HEAD_BYTE_REQUIRED;
364408
}
365409

410+
readArraySize(): number {
411+
const headByte = this.readHeadByte();
412+
413+
switch (headByte) {
414+
case 0xdc:
415+
return this.readU16();
416+
case 0xdd:
417+
return this.readU32();
418+
default: {
419+
if (headByte < 0xa0) {
420+
return headByte - 0x90;
421+
} else {
422+
throw new Error(`Unrecognized array type byte: ${prettyByte(headByte)}`);
423+
}
424+
}
425+
}
426+
}
427+
366428
pushMapState(size: number) {
367429
if (size > this.maxMapLength) {
368430
throw new Error(`Max length exceeded: map length (${size}) > maxMapLengthLength (${this.maxMapLength})`);

src/decodeAsync.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { ExtensionCodecType } from "./ExtensionCodec";
21
import { Decoder } from "./Decoder";
32
import { defaultDecodeOptions, DecodeOptions } from "./decode";
43

@@ -19,3 +18,21 @@ export async function decodeAsync(
1918
);
2019
return decoder.decodeOneAsync(stream);
2120
}
21+
22+
export async function* decodeArrayStream(
23+
stream: AsyncIterable<Uint8Array | ArrayLike<number>>,
24+
options: DecodeAsyncOptions = defaultDecodeOptions,
25+
) {
26+
const decoder = new Decoder(
27+
options.extensionCodec,
28+
options.maxStrLength,
29+
options.maxBinLength,
30+
options.maxArrayLength,
31+
options.maxMapLength,
32+
options.maxExtLength,
33+
);
34+
35+
for await (let item of decoder.decodeArrayStream(stream)) {
36+
yield item;
37+
}
38+
}

src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
export { encode } from "./encode";
44
export { decode } from "./decode";
5-
export { decodeAsync } from "./decodeAsync";
5+
export { decodeAsync, decodeArrayStream } from "./decodeAsync";
66

77
// Utilitiies for Extension Types:
88

0 commit comments

Comments
 (0)