Skip to content

Commit 3b7b4f2

Browse files
committed
support whatwg streams in decodeAsync() and decodeArrayStream()
1 parent ec1a37a commit 3b7b4f2

File tree

4 files changed

+56
-28
lines changed

4 files changed

+56
-28
lines changed

src/decodeAsync.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
11
import { Decoder } from "./Decoder";
22
import { defaultDecodeOptions, DecodeOptions } from "./decode";
3+
import { isReadableStream, asyncIterableFromStream } from "./utils/stream";
34

45
export type DecodeAsyncOptions = DecodeOptions;
56
export const defaultDecodeAsyncOptions = defaultDecodeOptions;
67

8+
type StreamLike<T> = AsyncIterable<T> | ReadableStream<T>;
9+
710
export async function decodeAsync(
8-
stream: AsyncIterable<Uint8Array | ArrayLike<number>>,
11+
streamLike: StreamLike<Uint8Array | ArrayLike<number>>,
912
options: DecodeAsyncOptions = defaultDecodeOptions,
1013
): Promise<unknown> {
14+
const stream = isReadableStream(streamLike) ? asyncIterableFromStream(streamLike) : streamLike;
15+
1116
const decoder = new Decoder(
1217
options.extensionCodec,
1318
options.maxStrLength,
@@ -20,9 +25,11 @@ export async function decodeAsync(
2025
}
2126

2227
export async function* decodeArrayStream(
23-
stream: AsyncIterable<Uint8Array | ArrayLike<number>>,
28+
streamLike: StreamLike<Uint8Array | ArrayLike<number>>,
2429
options: DecodeAsyncOptions = defaultDecodeOptions,
2530
) {
31+
const stream = isReadableStream(streamLike) ? asyncIterableFromStream(streamLike) : streamLike;
32+
2633
const decoder = new Decoder(
2734
options.extensionCodec,
2835
options.maxStrLength,

src/utils/stream.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// utility for whatwg streams
22

33
export function isReadableStream<T>(object: unknown): object is ReadableStream<T> {
4-
return typeof ReadableStream !== "object" && object instanceof ReadableStream;
4+
return typeof ReadableStream !== "undefined" && object instanceof ReadableStream;
55
}
66

77
export async function* asyncIterableFromStream<T>(stream: ReadableStream<T>): AsyncIterable<T> {

test/readme.test.ts

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import { deepStrictEqual } from "assert";
2-
import { encode, decode, decodeAsync } from "@msgpack/msgpack";
3-
import { asyncIterableFromStream } from "../src/utils/stream";
2+
import { encode, decode } from "@msgpack/msgpack";
43

54
describe("README", () => {
65
context("## Synopsis", () => {
@@ -22,27 +21,4 @@ describe("README", () => {
2221
deepStrictEqual(decode(encoded), object);
2322
});
2423
});
25-
26-
context("## ReadableStream", () => {
27-
before(function() {
28-
if (typeof ReadableStream === "undefined") {
29-
this.skip();
30-
}
31-
});
32-
33-
it("reads from stream", async () => {
34-
const data = [1, 2, 3];
35-
const encoded = encode(data);
36-
const stream = new ReadableStream({
37-
start(controller) {
38-
for (const byte of encoded) {
39-
controller.enqueue([byte]);
40-
}
41-
controller.close();
42-
},
43-
});
44-
45-
deepStrictEqual(await decodeAsync(asyncIterableFromStream(stream)), data);
46-
});
47-
});
4824
});

test/whatwg-streams.test.ts

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import { deepStrictEqual } from "assert";
2+
import { decodeAsync, encode, decodeArrayStream } from "@msgpack/msgpack";
3+
import { constants } from "http2";
4+
5+
describe("whatwg streams", () => {
6+
before(function() {
7+
if (typeof ReadableStream === "undefined") {
8+
this.skip();
9+
}
10+
});
11+
12+
context("decodeAsync", async () => {
13+
const data = [1, 2, 3];
14+
const encoded = encode(data);
15+
const stream = new ReadableStream({
16+
start(controller) {
17+
for (const byte of encoded) {
18+
controller.enqueue([byte]);
19+
}
20+
controller.close();
21+
},
22+
});
23+
24+
const items = [];
25+
for await (const item of decodeArrayStream(stream)) {
26+
items.push(item);
27+
}
28+
deepStrictEqual(items, data);
29+
});
30+
31+
it("reads from stream", async () => {
32+
const data = [1, 2, 3];
33+
const encoded = encode(data);
34+
const stream = new ReadableStream({
35+
start(controller) {
36+
for (const byte of encoded) {
37+
controller.enqueue([byte]);
38+
}
39+
controller.close();
40+
},
41+
});
42+
43+
deepStrictEqual(await decodeAsync(stream), data);
44+
});
45+
});

0 commit comments

Comments
 (0)