Skip to content

Commit 4196cd7

Browse files
committed
Fix flaky streaming test
1 parent 794884b commit 4196cd7

File tree

1 file changed

+40
-38
lines changed

1 file changed

+40
-38
lines changed

test/stream.spec.ts

Lines changed: 40 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ describe("Streaming", () => {
302302
this.timeout(10000); // Can be slow
303303

304304
const BROTLI_DATA_SIZE_KB = 2048;
305-
const inputChunks = generateLargeData(BROTLI_DATA_SIZE_KB, false);
305+
const inputChunks = generateLargeData(BROTLI_DATA_SIZE_KB);
306306
const { outputBeforeEnd, totalOutput } = await testTrueStreaming(
307307
createBrotliCompressStream(),
308308
inputChunks
@@ -319,7 +319,7 @@ describe("Streaming", () => {
319319
this.timeout(10000); // Can be slow
320320

321321
const BROTLI_DECOMPRESS_SIZE_KB = 1024; // 1MB
322-
const originalData = Buffer.concat(generateLargeData(BROTLI_DECOMPRESS_SIZE_KB, false));
322+
const originalData = Buffer.concat(generateLargeData(BROTLI_DECOMPRESS_SIZE_KB));
323323
const compressed = await brotliCompress(originalData);
324324

325325
const chunkSize = 16 * 1024;
@@ -494,7 +494,7 @@ describe("Streaming", () => {
494494
// Create a 1MB buffer
495495
const size = 1024 * 1024;
496496
const largeData = new Uint8Array(size);
497-
const rng = createSeededRng(0xDEADBEEF);
497+
const rng = mulberry32(0xDEADBEEF);
498498
for (let i = 0; i < size; i++) {
499499
largeData[i] = Math.floor(rng() * 256);
500500
}
@@ -626,7 +626,7 @@ describe("Streaming", () => {
626626
this.timeout(10000);
627627

628628
// Use 4MB for base64 to ensure we get multiple batches (1.5MB batch size)
629-
const inputChunks = generateLargeData(4096, false);
629+
const inputChunks = generateLargeData(4096);
630630
const { outputBeforeEnd, totalOutput } = await testTrueStreaming(
631631
createBase64EncodeStream(),
632632
inputChunks
@@ -644,7 +644,7 @@ describe("Streaming", () => {
644644
this.timeout(10000);
645645

646646
// Generate random data, encode to base64, then decode via stream
647-
const originalData = Buffer.concat(generateLargeData(4096, false));
647+
const originalData = Buffer.concat(generateLargeData(4096));
648648
const encoded = await encodeBase64(originalData);
649649

650650
const chunkSize = 64 * 1024;
@@ -717,51 +717,63 @@ const STREAMING_TEST_SIZE_KB = 256;
717717
// Helper to test true streaming behavior - verifies output arrives before all input is sent
718718
async function testTrueStreaming(
719719
transformStream: TransformStream<Uint8Array, Uint8Array>,
720-
inputChunks: Uint8Array[]
720+
inputChunks: Uint8Array[],
721+
timeoutMs = 5000
721722
): Promise<{ outputBeforeEnd: boolean; totalOutput: Uint8Array }> {
723+
if (inputChunks.length < 2) {
724+
throw new Error('testTrueStreaming requires at least 2 input chunks');
725+
}
726+
722727
const outputChunks: Uint8Array[] = [];
723-
let outputReceivedBeforeEnd = false;
724-
let inputComplete = false;
728+
729+
let resolveFirstOutput!: () => void;
730+
const firstOutputReceived = new Promise<void>(resolve => {
731+
resolveFirstOutput = resolve;
732+
});
725733

726734
const writer = transformStream.writable.getWriter();
727735
const reader = transformStream.readable.getReader();
728736

729-
// Start reading in background - this allows output to be collected as it arrives
737+
// Start reading in background
730738
const readPromise = (async () => {
731739
while (true) {
732740
const { done, value } = await reader.read();
733741
if (done) break;
734742
outputChunks.push(value);
735-
if (!inputComplete) {
736-
outputReceivedBeforeEnd = true;
737-
}
743+
if (outputChunks.length === 1) resolveFirstOutput();
738744
}
739745
})();
740746

741-
// Write chunks with small delays to allow output processing
742-
for (const chunk of inputChunks) {
743-
await writer.write(chunk);
744-
await new Promise(resolve => setTimeout(resolve, 0));
747+
// Write all chunks except the last
748+
for (let i = 0; i < inputChunks.length - 1; i++) {
749+
await writer.write(inputChunks[i]);
745750
}
746751

747-
inputComplete = true;
752+
// Wait for output before writing final chunk
753+
const outputBeforeEnd = await Promise.race([
754+
firstOutputReceived.then(() => true),
755+
new Promise<false>(resolve => setTimeout(() => resolve(false), timeoutMs))
756+
]);
757+
758+
// Write final chunk and close
759+
await writer.write(inputChunks[inputChunks.length - 1]);
748760
await writer.close();
749761
await readPromise;
750762

751-
// Combine output chunks
752-
const totalLength = outputChunks.reduce((sum, chunk) => sum + chunk.length, 0);
763+
// Combine output
764+
const totalLength = outputChunks.reduce((sum, c) => sum + c.length, 0);
753765
const totalOutput = new Uint8Array(totalLength);
754766
let offset = 0;
755767
for (const chunk of outputChunks) {
756768
totalOutput.set(chunk, offset);
757769
offset += chunk.length;
758770
}
759771

760-
return { outputBeforeEnd: outputReceivedBeforeEnd, totalOutput };
772+
return { outputBeforeEnd, totalOutput };
761773
}
762774

763-
// Mulberry32: high-quality 32-bit seeded PRNG that produces incompressible output
764-
function createSeededRng(seed: number) {
775+
// Mulberry32: seeded PRNG for deterministic incompressible test data
776+
function mulberry32(seed: number) {
765777
return function(): number {
766778
let t = seed += 0x6D2B79F5;
767779
t = Math.imul(t ^ t >>> 15, t | 1);
@@ -770,29 +782,19 @@ function createSeededRng(seed: number) {
770782
};
771783
}
772784

773-
// Generate large test data that will produce streaming output
774-
function generateLargeData(sizeKB: number, compressible = true): Uint8Array[] {
775-
const chunkSize = 16 * 1024; // 16KB chunks
785+
// Generate deterministic incompressible test data in 16KB chunks
786+
function generateLargeData(sizeKB: number): Uint8Array[] {
787+
const chunkSize = 16 * 1024;
776788
const totalSize = sizeKB * 1024;
777789
const chunks: Uint8Array[] = [];
778-
const pattern = 'This is test data that will be repeated to create compressible content. ';
779-
780-
// Fixed seed for deterministic, reproducible test data
781-
const rng = createSeededRng(0x12345678);
790+
const rng = mulberry32(0x12345678);
782791

783792
let remaining = totalSize;
784793
while (remaining > 0) {
785794
const size = Math.min(chunkSize, remaining);
786795
const chunk = new Uint8Array(size);
787-
if (compressible) {
788-
for (let i = 0; i < size; i++) {
789-
chunk[i] = pattern.charCodeAt(i % pattern.length);
790-
}
791-
} else {
792-
// Use seeded PRNG for deterministic incompressible data
793-
for (let i = 0; i < size; i++) {
794-
chunk[i] = Math.floor(rng() * 256);
795-
}
796+
for (let i = 0; i < size; i++) {
797+
chunk[i] = Math.floor(rng() * 256);
796798
}
797799
chunks.push(chunk);
798800
remaining -= size;

0 commit comments

Comments
 (0)