From 2dd59e1179f08e9e5477f75373ad3dfe8f23b742 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Thu, 8 Jan 2026 17:15:03 +0100 Subject: [PATCH 1/8] Improve KafkaMessageBatchStream memory handling by using transform and callback approach --- .../lib/utils/KafkaMessageBatchStream.spec.ts | 98 +++++++++------- .../lib/utils/KafkaMessageBatchStream.ts | 111 +++++++++--------- 2 files changed, 109 insertions(+), 100 deletions(-) diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts index 8e604e5a..a60190ab 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts @@ -12,22 +12,26 @@ describe('KafkaMessageBatchStream', () => { })) // When - const batchStream = new KafkaMessageBatchStream({ - batchSize: 3, - timeoutMilliseconds: 10000, - }) // Setting big timeout to check batch size only - const receivedBatches: MessageBatch[] = [] - const dataFetchingPromise = new Promise((resolve) => { - batchStream.on('data', (batch) => { + let resolvePromise: () => void + const dataFetchingPromise = new Promise((resolve) => { + resolvePromise = resolve + }) + + const batchStream = new KafkaMessageBatchStream( + (batch) => { receivedBatches.push(batch) // We expect 3 batches and last message waiting in the stream if (receivedBatches.length >= 3) { - resolve(null) + resolvePromise() } - }) - }) + }, + { + batchSize: 3, + timeoutMilliseconds: 10000, + }, + ) // Setting big timeout to check batch size only for (const message of messages) { batchStream.write(message) @@ -54,23 +58,25 @@ describe('KafkaMessageBatchStream', () => { })) // When - const batchStream = new KafkaMessageBatchStream({ - batchSize: 1000, - timeoutMilliseconds: 500, - }) // Setting big batch size to check timeout only - const receivedBatches: MessageBatch[] = [] - batchStream.on('data', (batch) => { - receivedBatches.push(batch) - }) + + const batchStream = new KafkaMessageBatchStream( + (batch) => { + receivedBatches.push(batch) + }, + { + batchSize: 1000, + timeoutMilliseconds: 100, + }, + ) // Setting big batch size to check timeout only for (const message of messages) { batchStream.write(message) } - // Sleep 1 seconds to let the timeout trigger + // Sleep to let the timeout trigger await new Promise((resolve) => { - setTimeout(resolve, 1000) + setTimeout(resolve, 150) }) // Then @@ -104,16 +110,16 @@ describe('KafkaMessageBatchStream', () => { ] // When - const batchStream = new KafkaMessageBatchStream<{ topic: string; partition: number }>({ - batchSize: 2, - timeoutMilliseconds: 10000, - }) // Setting big timeout to check batch size only - const receivedBatchesByTopicPartition: Record = {} - let receivedMessagesCounter = 0 - const dataFetchingPromise = new Promise((resolve) => { - batchStream.on('data', (batch) => { + + let resolvePromise: () => void + const dataFetchingPromise = new Promise((resolve) => { + resolvePromise = resolve + }) + + const batchStream = new KafkaMessageBatchStream<{ topic: string; partition: number }>( + (batch) => { const key = `${batch.topic}:${batch.partition}` if (!receivedBatchesByTopicPartition[key]) { receivedBatchesByTopicPartition[key] = [] @@ -123,10 +129,14 @@ describe('KafkaMessageBatchStream', () => { // We expect 5 batches and last message waiting in the stream receivedMessagesCounter++ if (receivedMessagesCounter >= 5) { - resolve(null) + resolvePromise() } - }) - }) + }, + { + batchSize: 2, + timeoutMilliseconds: 10000, + }, + ) // Setting big timeout to check batch size only for (const message of messages) { batchStream.write(message) @@ -177,25 +187,29 @@ describe('KafkaMessageBatchStream', () => { ] // When - const batchStream = new KafkaMessageBatchStream<{ topic: string; partition: number }>({ - batchSize: 2, - timeoutMilliseconds: 10000, - }) // Setting big timeout to check batch size only - const receivedBatches: any[] = [] - let receivedBatchesCounter = 0 - const dataFetchingPromise = new Promise((resolve) => { - batchStream.on('data', (batch) => { + + let resolvePromise: () => void + const dataFetchingPromise = new Promise((resolve) => { + resolvePromise = resolve + }) + + const batchStream = new KafkaMessageBatchStream<{ topic: string; partition: number }>( + (batch) => { receivedBatches.push(batch) // We expect 4 batches (2 per partition) receivedBatchesCounter++ if (receivedBatchesCounter >= 4) { - resolve(null) + resolvePromise() } - }) - }) + }, + { + batchSize: 2, + timeoutMilliseconds: 10000, + }, + ) // Setting big timeout to check batch size only for (const message of messages) { batchStream.write(message) diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts index 5ae29df8..9a9fd730 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts @@ -1,6 +1,4 @@ -import { Duplex } from 'node:stream' - -type CallbackFunction = (error?: Error | null) => void +import { Transform } from 'node:stream' // Topic and partition are required for the stream to work properly type MessageWithTopicAndPartition = { topic: string; partition: number } @@ -11,103 +9,100 @@ export type KafkaMessageBatchOptions = { } export type MessageBatch = { topic: string; partition: number; messages: TMessage[] } - -export interface KafkaMessageBatchStream - extends Duplex { - // biome-ignore lint/suspicious/noExplicitAny: compatible with Duplex definition - on(event: string | symbol, listener: (...args: any[]) => void): this - on(event: 'data', listener: (chunk: MessageBatch) => void): this - - push(chunk: MessageBatch | null): boolean -} +export type OnMessageBatchCallback = ( + batch: MessageBatch, +) => Promise | void /** * Collects messages in batches based on provided batchSize and flushes them when messages amount or timeout is reached. + * + * This implementation uses Transform stream which properly handles backpressure by design. + * When the downstream consumer is slow, the stream will automatically pause accepting new messages + * until the consumer catches up, preventing memory leaks and OOM errors. */ -// biome-ignore lint/suspicious/noUnsafeDeclarationMerging: merging interface with class to add strong typing for 'data' event -export class KafkaMessageBatchStream extends Duplex { +export class KafkaMessageBatchStream< + TMessage extends MessageWithTopicAndPartition, +> extends Transform { + private readonly onBatch: OnMessageBatchCallback private readonly batchSize: number private readonly timeout: number private readonly currentBatchPerTopicPartition: Record private readonly batchTimeoutPerTopicPartition: Record - constructor(options: { batchSize: number; timeoutMilliseconds: number }) { + constructor( + onBatch: OnMessageBatchCallback, + options: { batchSize: number; timeoutMilliseconds: number }, + ) { super({ objectMode: true }) + this.onBatch = onBatch this.batchSize = options.batchSize this.timeout = options.timeoutMilliseconds this.currentBatchPerTopicPartition = {} this.batchTimeoutPerTopicPartition = {} } - override _read() { - // No-op, as we push data when we have a full batch or timeout - } - - override _write(message: TMessage, _encoding: BufferEncoding, callback: CallbackFunction) { - const key = this.getTopicPartitionKey(message.topic, message.partition) + override async _transform(message: TMessage, _encoding: BufferEncoding, callback: () => void) { + const key = getTopicPartitionKey(message.topic, message.partition) - if (!this.currentBatchPerTopicPartition[key]) { - this.currentBatchPerTopicPartition[key] = [message] - } else { - // biome-ignore lint/style/noNonNullAssertion: non-existing entry is handled above - this.currentBatchPerTopicPartition[key]!.push(message) - } + // Accumulate message + if (!this.currentBatchPerTopicPartition[key]) this.currentBatchPerTopicPartition[key] = [] + // biome-ignore lint/style/noNonNullAssertion: non-existing entry is handled above + this.currentBatchPerTopicPartition[key]!.push(message) - // biome-ignore lint/style/noNonNullAssertion: we ensure above that the array is defined - if (this.currentBatchPerTopicPartition[key]!.length >= this.batchSize) { - this.flushCurrentBatchMessages(message.topic, message.partition) - return callback(null) + // Check if batch is complete by size + if (this.currentBatchPerTopicPartition[key].length >= this.batchSize) { + await this.flushCurrentBatchMessages(message.topic, message.partition) + callback() + return } + // Start timeout for this partition if not already started if (!this.batchTimeoutPerTopicPartition[key]) { - this.batchTimeoutPerTopicPartition[key] = setTimeout(() => { - this.flushCurrentBatchMessages(message.topic, message.partition) - }, this.timeout) + this.batchTimeoutPerTopicPartition[key] = setTimeout( + () => this.flushCurrentBatchMessages(message.topic, message.partition), + this.timeout, + ) } - callback(null) + callback() } - // Write side is closed, flush the remaining messages - override _final(callback: CallbackFunction) { - this.flushAllBatches() - this.push(null) // End readable side + // Flush all remaining batches when stream is closing + override async _flush(callback: () => void) { + await this.flushAllBatches() callback() } - private flushAllBatches() { + private async flushAllBatches() { for (const key of Object.keys(this.currentBatchPerTopicPartition)) { - const { topic, partition } = this.splitTopicPartitionKey(key) - this.flushCurrentBatchMessages(topic, partition) + const { topic, partition } = splitTopicPartitionKey(key) + await this.flushCurrentBatchMessages(topic, partition) } } - private flushCurrentBatchMessages(topic: string, partition: number) { - const key = this.getTopicPartitionKey(topic, partition) + private async flushCurrentBatchMessages(topic: string, partition: number) { + const key = getTopicPartitionKey(topic, partition) + // Clear timeout if (this.batchTimeoutPerTopicPartition[key]) { clearTimeout(this.batchTimeoutPerTopicPartition[key]) this.batchTimeoutPerTopicPartition[key] = undefined } - if (!this.currentBatchPerTopicPartition[key]?.length) { - return - } + const messages = this.currentBatchPerTopicPartition[key] ?? [] - this.push({ topic, partition, messages: this.currentBatchPerTopicPartition[key] }) + // Push the batch downstream + await this.onBatch({ topic, partition, messages }) this.currentBatchPerTopicPartition[key] = [] } +} - private getTopicPartitionKey(topic: string, partition: number): string { - return `${topic}:${partition}` - } - - private splitTopicPartitionKey(key: string): { topic: string; partition: number } { - const [topic, partition] = key.split(':') - if (!topic || !partition) { - throw new Error('Invalid topic-partition key format') - } - return { topic, partition: Number.parseInt(partition, 10) } +const getTopicPartitionKey = (topic: string, partition: number): string => `${topic}:${partition}` +const splitTopicPartitionKey = (key: string): { topic: string; partition: number } => { + const [topic, partition] = key.split(':') + if (!topic || !partition) { + throw new Error('Invalid topic-partition key format') } + return { topic, partition: Number.parseInt(partition, 10) } } From ccc94dbee43b3bdaf134b043fd5afe36538b71c4 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Thu, 8 Jan 2026 17:21:04 +0100 Subject: [PATCH 2/8] Using new batch procesing class --- packages/kafka/lib/AbstractKafkaConsumer.ts | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index a64d5e0d..394c27e8 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -188,14 +188,19 @@ export abstract class AbstractKafkaConsumer< }) this.consumerStream = await this.consumer.consume({ ...consumeOptions, topics }) + this.consumerStream.on('error', (error) => this.handlerError(error)) + if (this.options.batchProcessingEnabled && this.options.batchProcessingOptions) { this.messageBatchStream = new KafkaMessageBatchStream< DeserializedMessage> - >({ - batchSize: this.options.batchProcessingOptions.batchSize, - timeoutMilliseconds: this.options.batchProcessingOptions.timeoutMilliseconds, - }) + >( + (batch) => + this.consume(batch.topic, batch.messages).catch((error) => this.handlerError(error)), + this.options.batchProcessingOptions, + ) this.consumerStream.pipe(this.messageBatchStream) + } else { + this.handleSyncStream(this.consumerStream).catch((error) => this.handlerError(error)) } } catch (error) { throw new InternalError({ @@ -204,14 +209,6 @@ export abstract class AbstractKafkaConsumer< cause: error, }) } - - if (this.options.batchProcessingEnabled && this.messageBatchStream) { - this.handleSyncStreamBatch(this.messageBatchStream).catch((error) => this.handlerError(error)) - } else { - this.handleSyncStream(this.consumerStream).catch((error) => this.handlerError(error)) - } - - this.consumerStream.on('error', (error) => this.handlerError(error)) } private async handleSyncStream( From b36b809447f745cd7c78bb6fa1f07a924f5e6eb0 Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Thu, 8 Jan 2026 17:49:57 +0100 Subject: [PATCH 3/8] Coverage fix --- packages/kafka/lib/AbstractKafkaConsumer.ts | 17 ++++------------- .../kafka/lib/utils/KafkaMessageBatchStream.ts | 7 ++++--- 2 files changed, 8 insertions(+), 16 deletions(-) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index 394c27e8..cc69a009 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -149,11 +149,12 @@ export abstract class AbstractKafkaConsumer< if (!this.consumerStream && !this.messageBatchStream) return false try { return this.consumer.isConnected() + /* v8 ignore start */ } catch (_) { // this should not happen, but if so it means the consumer is not healthy - /* v8 ignore next */ return false } + /* v8 ignore stop */ } /** @@ -165,11 +166,12 @@ export abstract class AbstractKafkaConsumer< if (!this.consumerStream && !this.messageBatchStream) return false try { return this.consumer.isActive() + /* v8 ignore start */ } catch (_) { // this should not happen, but if so it means the consumer is not healthy - /* v8 ignore next */ return false } + /* v8 ignore stop */ } async init(): Promise { @@ -221,16 +223,6 @@ export abstract class AbstractKafkaConsumer< ) } } - private async handleSyncStreamBatch( - stream: KafkaMessageBatchStream>>, - ): Promise { - for await (const messageBatch of stream) { - await this.consume( - messageBatch.topic, - messageBatch.messages as DeserializedMessage>, - ) - } - } async close(): Promise { if (!this.consumerStream && !this.messageBatchStream) { @@ -288,7 +280,6 @@ export abstract class AbstractKafkaConsumer< const firstMessage = validMessages[0]! const requestContext = this.getRequestContext(firstMessage) - /* v8 ignore next */ const transactionId = randomUUID() this.transactionObservabilityManager?.start(this.buildTransactionName(topic), transactionId) diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts index 9a9fd730..8a52c91f 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts @@ -101,8 +101,9 @@ export class KafkaMessageBatchStream< const getTopicPartitionKey = (topic: string, partition: number): string => `${topic}:${partition}` const splitTopicPartitionKey = (key: string): { topic: string; partition: number } => { const [topic, partition] = key.split(':') - if (!topic || !partition) { - throw new Error('Invalid topic-partition key format') - } + /* v8 ignore start */ + if (!topic || !partition) throw new Error('Invalid topic-partition key format') + /* v8 ignore stop */ + return { topic, partition: Number.parseInt(partition, 10) } } From 7353ba585d2abc8f27aa0976bb03557f1934d3ba Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Thu, 8 Jan 2026 20:13:52 +0100 Subject: [PATCH 4/8] Trying to fix backpreassure issues due to timeout flush --- .../lib/utils/KafkaMessageBatchStream.spec.ts | 8 ++++- .../lib/utils/KafkaMessageBatchStream.ts | 29 ++++++++++++++----- 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts index a60190ab..427a60ea 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts @@ -22,10 +22,11 @@ describe('KafkaMessageBatchStream', () => { const batchStream = new KafkaMessageBatchStream( (batch) => { receivedBatches.push(batch) - // We expect 3 batches and last message waiting in the stream + // We expect 3 batches and the last message waiting in the stream if (receivedBatches.length >= 3) { resolvePromise() } + return Promise.resolve() }, { batchSize: 3, @@ -63,6 +64,7 @@ describe('KafkaMessageBatchStream', () => { const batchStream = new KafkaMessageBatchStream( (batch) => { receivedBatches.push(batch) + return Promise.resolve() }, { batchSize: 1000, @@ -131,6 +133,8 @@ describe('KafkaMessageBatchStream', () => { if (receivedMessagesCounter >= 5) { resolvePromise() } + + return Promise.resolve() }, { batchSize: 2, @@ -204,6 +208,8 @@ describe('KafkaMessageBatchStream', () => { if (receivedBatchesCounter >= 4) { resolvePromise() } + + return Promise.resolve() }, { batchSize: 2, diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts index 8a52c91f..934e6712 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts @@ -9,9 +9,7 @@ export type KafkaMessageBatchOptions = { } export type MessageBatch = { topic: string; partition: number; messages: TMessage[] } -export type OnMessageBatchCallback = ( - batch: MessageBatch, -) => Promise | void +export type OnMessageBatchCallback = (batch: MessageBatch) => Promise /** * Collects messages in batches based on provided batchSize and flushes them when messages amount or timeout is reached. @@ -30,6 +28,8 @@ export class KafkaMessageBatchStream< private readonly currentBatchPerTopicPartition: Record private readonly batchTimeoutPerTopicPartition: Record + private readonly timeoutBatchesBeingProcessed: Map> = new Map() + constructor( onBatch: OnMessageBatchCallback, options: { batchSize: number; timeoutMilliseconds: number }, @@ -45,12 +45,19 @@ export class KafkaMessageBatchStream< override async _transform(message: TMessage, _encoding: BufferEncoding, callback: () => void) { const key = getTopicPartitionKey(message.topic, message.partition) - // Accumulate message + // Wait for all pending timeout flushes to complete to maintain backpressure + if (this.timeoutBatchesBeingProcessed.size > 0) { + // Capture a snapshot of current promises to avoid race conditions with new timeouts + const promises = Array.from(this.timeoutBatchesBeingProcessed.values()) + // Wait for all to complete and then clean up from the map + await Promise.all(promises) + } + + // Accumulate the message if (!this.currentBatchPerTopicPartition[key]) this.currentBatchPerTopicPartition[key] = [] - // biome-ignore lint/style/noNonNullAssertion: non-existing entry is handled above - this.currentBatchPerTopicPartition[key]!.push(message) + this.currentBatchPerTopicPartition[key].push(message) - // Check if batch is complete by size + // Check if the batch is complete by size if (this.currentBatchPerTopicPartition[key].length >= this.batchSize) { await this.flushCurrentBatchMessages(message.topic, message.partition) callback() @@ -60,7 +67,13 @@ export class KafkaMessageBatchStream< // Start timeout for this partition if not already started if (!this.batchTimeoutPerTopicPartition[key]) { this.batchTimeoutPerTopicPartition[key] = setTimeout( - () => this.flushCurrentBatchMessages(message.topic, message.partition), + () => + this.timeoutBatchesBeingProcessed.set( + key, + this.flushCurrentBatchMessages(message.topic, message.partition).finally(() => + this.timeoutBatchesBeingProcessed.delete(key), + ), + ), this.timeout, ) } From 9a3754129d2710ef625e4eaaa49dd9f183e6effd Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Thu, 8 Jan 2026 20:32:43 +0100 Subject: [PATCH 5/8] name improvement --- packages/kafka/lib/utils/KafkaMessageBatchStream.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts index 934e6712..556ba2ea 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts @@ -28,7 +28,7 @@ export class KafkaMessageBatchStream< private readonly currentBatchPerTopicPartition: Record private readonly batchTimeoutPerTopicPartition: Record - private readonly timeoutBatchesBeingProcessed: Map> = new Map() + private readonly timeoutProcessingPromises: Map> = new Map() constructor( onBatch: OnMessageBatchCallback, @@ -46,9 +46,9 @@ export class KafkaMessageBatchStream< const key = getTopicPartitionKey(message.topic, message.partition) // Wait for all pending timeout flushes to complete to maintain backpressure - if (this.timeoutBatchesBeingProcessed.size > 0) { + if (this.timeoutProcessingPromises.size > 0) { // Capture a snapshot of current promises to avoid race conditions with new timeouts - const promises = Array.from(this.timeoutBatchesBeingProcessed.values()) + const promises = Array.from(this.timeoutProcessingPromises.values()) // Wait for all to complete and then clean up from the map await Promise.all(promises) } @@ -68,10 +68,10 @@ export class KafkaMessageBatchStream< if (!this.batchTimeoutPerTopicPartition[key]) { this.batchTimeoutPerTopicPartition[key] = setTimeout( () => - this.timeoutBatchesBeingProcessed.set( + this.timeoutProcessingPromises.set( key, this.flushCurrentBatchMessages(message.topic, message.partition).finally(() => - this.timeoutBatchesBeingProcessed.delete(key), + this.timeoutProcessingPromises.delete(key), ), ), this.timeout, From f210ce3671c64ffb43e3c0aac5afe273c36fb2ce Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Thu, 8 Jan 2026 20:48:09 +0100 Subject: [PATCH 6/8] Adding test to verify proper timeout flush handling --- .../lib/utils/KafkaMessageBatchStream.spec.ts | 62 +++++++++++++++++++ .../lib/utils/KafkaMessageBatchStream.ts | 11 +--- 2 files changed, 65 insertions(+), 8 deletions(-) diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts index 427a60ea..9d885132 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts @@ -1,4 +1,8 @@ +import { waitAndRetry } from '@lokalise/universal-ts-utils/node' +import {setTimeout as sleep} from 'node:timers/promises' import { KafkaMessageBatchStream, type MessageBatch } from './KafkaMessageBatchStream.ts' +import {pipeline, Readable} from "node:stream"; +import {promisify} from "node:util"; describe('KafkaMessageBatchStream', () => { it('should batch messages based on batch size', async () => { @@ -231,4 +235,62 @@ describe('KafkaMessageBatchStream', () => { { topic, partition: 1, messages: [messages[5], messages[7]] }, ]) }) + + it('should handle backpressure correctly when timeout flush is slow', async () => { + // Given + const topic = 'test-topic' + const messages = Array.from({ length: 6 }, (_, i) => ({ + id: i + 1, + content: `Message ${i + 1}`, + topic, + partition: 0, + })) + + const batchStartTimes: number[] = [] + const batchEndTimes: number[] = [] + let batchesProcessing = 0 + let maxConcurrentBatches = 0 + + const batchStream = new KafkaMessageBatchStream( + async (_batch) => { + batchStartTimes.push(Date.now()) + batchesProcessing++ + maxConcurrentBatches = Math.max(maxConcurrentBatches, batchesProcessing) + + // Simulate batch processing (50ms per batch) + await new Promise((resolve) => setTimeout(resolve, 50)) + + batchEndTimes.push(Date.now()) + batchesProcessing-- + }, + { + batchSize: 1000, // Large batch size to never trigger size-based flushing + timeoutMilliseconds: 10, // Short timeout to trigger flush after each message + }, + ) + + // When: Write messages with 80ms delay between them (longer than timeout + processing) + // This ensures each message triggers its own timeout flush before the next arrives + for (const message of messages) { + batchStream.write(message) + await sleep(80) + } + + // Wait until all 6 batches have been processed (one per message) + await waitAndRetry(() => batchEndTimes.length >= 6, 50, 20) + + // Then: Verify that batches never processed in parallel (backpressure working) + expect(maxConcurrentBatches).toBe(1) // Should never process more than 1 batch at a time + + // Verify that batches were processed sequentially (each starts after previous ends) + for (let i = 1; i < batchStartTimes.length; i++) { + const previousEndTime = batchEndTimes[i - 1] + const currentStartTime = batchStartTimes[i] + // Current batch must start after previous batch finished + expect(currentStartTime).toBeGreaterThanOrEqual(previousEndTime ?? 0) + } + + // Verify we got exactly 6 batches (one per message via timeout) + expect(batchEndTimes.length).toEqual(6) + }) }) diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts index 556ba2ea..081ce717 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts @@ -48,9 +48,9 @@ export class KafkaMessageBatchStream< // Wait for all pending timeout flushes to complete to maintain backpressure if (this.timeoutProcessingPromises.size > 0) { // Capture a snapshot of current promises to avoid race conditions with new timeouts - const promises = Array.from(this.timeoutProcessingPromises.values()) + const promiseEntries = Array.from(this.timeoutProcessingPromises.entries()) // Wait for all to complete and then clean up from the map - await Promise.all(promises) + await Promise.all(promiseEntries.map(([k, p]) => p.finally(() => this.timeoutProcessingPromises.delete(k)))) } // Accumulate the message @@ -68,12 +68,7 @@ export class KafkaMessageBatchStream< if (!this.batchTimeoutPerTopicPartition[key]) { this.batchTimeoutPerTopicPartition[key] = setTimeout( () => - this.timeoutProcessingPromises.set( - key, - this.flushCurrentBatchMessages(message.topic, message.partition).finally(() => - this.timeoutProcessingPromises.delete(key), - ), - ), + this.timeoutProcessingPromises.set(key, this.flushCurrentBatchMessages(message.topic, message.partition)), this.timeout, ) } From 846be5901aaaab6334ba8f2eb970a07f7ee77c2b Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Fri, 9 Jan 2026 09:25:08 +0100 Subject: [PATCH 7/8] Lint fix --- packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts | 4 +--- packages/kafka/lib/utils/KafkaMessageBatchStream.ts | 9 +++++++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts index 9d885132..6295532d 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts @@ -1,8 +1,6 @@ +import { setTimeout as sleep } from 'node:timers/promises' import { waitAndRetry } from '@lokalise/universal-ts-utils/node' -import {setTimeout as sleep} from 'node:timers/promises' import { KafkaMessageBatchStream, type MessageBatch } from './KafkaMessageBatchStream.ts' -import {pipeline, Readable} from "node:stream"; -import {promisify} from "node:util"; describe('KafkaMessageBatchStream', () => { it('should batch messages based on batch size', async () => { diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts index 081ce717..a6c43d0a 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.ts @@ -50,7 +50,9 @@ export class KafkaMessageBatchStream< // Capture a snapshot of current promises to avoid race conditions with new timeouts const promiseEntries = Array.from(this.timeoutProcessingPromises.entries()) // Wait for all to complete and then clean up from the map - await Promise.all(promiseEntries.map(([k, p]) => p.finally(() => this.timeoutProcessingPromises.delete(k)))) + await Promise.all( + promiseEntries.map(([k, p]) => p.finally(() => this.timeoutProcessingPromises.delete(k))), + ) } // Accumulate the message @@ -68,7 +70,10 @@ export class KafkaMessageBatchStream< if (!this.batchTimeoutPerTopicPartition[key]) { this.batchTimeoutPerTopicPartition[key] = setTimeout( () => - this.timeoutProcessingPromises.set(key, this.flushCurrentBatchMessages(message.topic, message.partition)), + this.timeoutProcessingPromises.set( + key, + this.flushCurrentBatchMessages(message.topic, message.partition), + ), this.timeout, ) } From b47fe8811cef7a28e933653f6b658b4b93baeebf Mon Sep 17 00:00:00 2001 From: CarlosGamero Date: Fri, 9 Jan 2026 09:46:49 +0100 Subject: [PATCH 8/8] Improving tests --- .../lib/utils/KafkaMessageBatchStream.spec.ts | 45 ++++++++++--------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts b/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts index 6295532d..ccc88e03 100644 --- a/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts +++ b/packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts @@ -1,4 +1,4 @@ -import { setTimeout as sleep } from 'node:timers/promises' +import { setTimeout } from 'node:timers/promises' import { waitAndRetry } from '@lokalise/universal-ts-utils/node' import { KafkaMessageBatchStream, type MessageBatch } from './KafkaMessageBatchStream.ts' @@ -79,9 +79,7 @@ describe('KafkaMessageBatchStream', () => { } // Sleep to let the timeout trigger - await new Promise((resolve) => { - setTimeout(resolve, 150) - }) + await setTimeout(150) // Then expect(receivedBatches).toEqual([{ topic, partition: 0, messages }]) @@ -244,19 +242,22 @@ describe('KafkaMessageBatchStream', () => { partition: 0, })) - const batchStartTimes: number[] = [] - const batchEndTimes: number[] = [] - let batchesProcessing = 0 - let maxConcurrentBatches = 0 + const batchStartTimes: number[] = [] // Track start times of batch processing + const batchEndTimes: number[] = [] // Track end times of batch processing + const batchMessageCounts: number[] = [] // Track number of messages per batch + let maxConcurrentBatches = 0 // Track max concurrent batches + let batchesProcessing = 0 const batchStream = new KafkaMessageBatchStream( - async (_batch) => { + async (batch) => { batchStartTimes.push(Date.now()) + batchMessageCounts.push(batch.messages.length) + batchesProcessing++ maxConcurrentBatches = Math.max(maxConcurrentBatches, batchesProcessing) // Simulate batch processing (50ms per batch) - await new Promise((resolve) => setTimeout(resolve, 50)) + await setTimeout(50) batchEndTimes.push(Date.now()) batchesProcessing-- @@ -267,28 +268,32 @@ describe('KafkaMessageBatchStream', () => { }, ) - // When: Write messages with 80ms delay between them (longer than timeout + processing) - // This ensures each message triggers its own timeout flush before the next arrives + // When: Write messages with 20ms delay between them + // Since processing (50ms) is slower than message arrival + timeout, backpressure causes accumulation for (const message of messages) { batchStream.write(message) - await sleep(80) + await setTimeout(20) } - // Wait until all 6 batches have been processed (one per message) - await waitAndRetry(() => batchEndTimes.length >= 6, 50, 20) + // Then + // Wait until all 3 batches have been processed + await waitAndRetry(() => batchMessageCounts.length >= 3, 500, 20) + + // Backpressure causes messages to accumulate while previous batch processes: + // - Batch 1: Message 1 (flushed at 10ms timeout) + // - Batch 2: Messages 2-4 (accumulated during Batch 1 processing, including Message 4 arriving at ~60ms) + // - Batch 3: Messages 5-6 (accumulated during Batch 2 processing) + expect(batchMessageCounts).toEqual([1, 3, 2]) - // Then: Verify that batches never processed in parallel (backpressure working) + // Verify that batches never processed in parallel (backpressure working) expect(maxConcurrentBatches).toBe(1) // Should never process more than 1 batch at a time // Verify that batches were processed sequentially (each starts after previous ends) for (let i = 1; i < batchStartTimes.length; i++) { const previousEndTime = batchEndTimes[i - 1] const currentStartTime = batchStartTimes[i] - // Current batch must start after previous batch finished + // The current batch must start after the previous batch finished expect(currentStartTime).toBeGreaterThanOrEqual(previousEndTime ?? 0) } - - // Verify we got exactly 6 batches (one per message via timeout) - expect(batchEndTimes.length).toEqual(6) }) })