Skip to content

Commit 875ecd2

Browse files
authored
Merge pull request #43 from msgpack/whatwg-streams
support whatwg-stream.ReadableStream as decodeAsync() input source
2 parents 0e3f706 + df769d0 commit 875ecd2

File tree

4 files changed

+84
-4
lines changed

4 files changed

+84
-4
lines changed

src/decodeAsync.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
import { Decoder } from "./Decoder";
22
import { defaultDecodeOptions, DecodeOptions } from "./decode";
3+
import { ensureAsyncIterabe, ReadableStreamLike } from "./utils/stream";
34

45
export type DecodeAsyncOptions = DecodeOptions;
56
export const defaultDecodeAsyncOptions = defaultDecodeOptions;
67

78
export async function decodeAsync(
8-
stream: AsyncIterable<Uint8Array | ArrayLike<number>>,
9+
streamLike: ReadableStreamLike<Uint8Array | ArrayLike<number>>,
910
options: DecodeAsyncOptions = defaultDecodeOptions,
1011
): Promise<unknown> {
12+
const stream = ensureAsyncIterabe(streamLike);
13+
1114
const decoder = new Decoder(
1215
options.extensionCodec,
1316
options.maxStrLength,
@@ -20,9 +23,11 @@ export async function decodeAsync(
2023
}
2124

2225
export async function* decodeArrayStream(
23-
stream: AsyncIterable<Uint8Array | ArrayLike<number>>,
26+
streamLike: ReadableStreamLike<Uint8Array | ArrayLike<number>>,
2427
options: DecodeAsyncOptions = defaultDecodeOptions,
2528
) {
29+
const stream = ensureAsyncIterabe(streamLike);
30+
2631
const decoder = new Decoder(
2732
options.extensionCodec,
2833
options.maxStrLength,

src/utils/stream.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// utility for whatwg streams
2+
3+
export type ReadableStreamLike<T> = AsyncIterable<T> | ReadableStream<T>;
4+
5+
export function isReadableStream<T>(object: unknown): object is ReadableStream<T> {
6+
return typeof ReadableStream !== "undefined" && object instanceof ReadableStream;
7+
}
8+
9+
export async function* asyncIterableFromStream<T>(stream: ReadableStream<T>): AsyncIterable<T> {
10+
const reader = stream.getReader();
11+
12+
try {
13+
while (true) {
14+
const { done, value } = await reader.read();
15+
if (done) {
16+
return;
17+
}
18+
yield value;
19+
}
20+
} finally {
21+
reader.releaseLock();
22+
}
23+
}
24+
25+
export function ensureAsyncIterabe<T>(streamLike: ReadableStreamLike<T>): AsyncIterable<T> {
26+
if (isReadableStream(streamLike)) {
27+
return asyncIterableFromStream(streamLike);
28+
} else {
29+
return streamLike;
30+
}
31+
}

test/readme.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import { deepStrictEqual } from "assert";
2-
import { encode, decode } from "../src";
2+
import { encode, decode } from "@msgpack/msgpack";
33

44
describe("README", () => {
5-
context("#synopsis", () => {
5+
context("## Synopsis", () => {
66
it("runs", () => {
77
const object = {
88
nil: null,

test/whatwg-streams.test.ts

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import { deepStrictEqual } from "assert";
2+
import { decodeAsync, encode, decodeArrayStream } from "@msgpack/msgpack";
3+
4+
describe("whatwg streams", () => {
5+
before(function() {
6+
if (typeof ReadableStream === "undefined") {
7+
this.skip();
8+
}
9+
});
10+
11+
context("decodeAsync", async () => {
12+
const data = [1, 2, 3];
13+
const encoded = encode(data);
14+
const stream = new ReadableStream({
15+
start(controller) {
16+
for (const byte of encoded) {
17+
controller.enqueue([byte]);
18+
}
19+
controller.close();
20+
},
21+
});
22+
23+
const items = [];
24+
for await (const item of decodeArrayStream(stream)) {
25+
items.push(item);
26+
}
27+
deepStrictEqual(items, data);
28+
});
29+
30+
it("reads from stream", async () => {
31+
const data = [1, 2, 3];
32+
const encoded = encode(data);
33+
const stream = new ReadableStream({
34+
start(controller) {
35+
for (const byte of encoded) {
36+
controller.enqueue([byte]);
37+
}
38+
controller.close();
39+
},
40+
});
41+
42+
deepStrictEqual(await decodeAsync(stream), data);
43+
});
44+
});

0 commit comments

Comments
 (0)