@@ -82,7 +82,7 @@ type TaskRunInsert = {
8282export type RunsReplicationServiceEvents = {
8383 message : [ { lsn : string ; message : PgoutputMessage ; service : RunsReplicationService } ] ;
8484 batchFlushed : [
85- { flushId : string ; taskRunInserts : TaskRunInsertArray [ ] ; payloadInserts : PayloadInsertArray [ ] } ,
85+ { flushId : string ; taskRunInserts : TaskRunInsertArray [ ] ; payloadInserts : PayloadInsertArray [ ] }
8686 ] ;
8787} ;
8888
@@ -214,34 +214,14 @@ export class RunsReplicationService {
214214 flushInterval : options . flushIntervalMs ?? 100 ,
215215 maxConcurrency : options . maxFlushConcurrency ?? 100 ,
216216 callback : this . #flushBatch. bind ( this ) ,
217- // we can do some pre-merging to reduce the amount of data we need to send to clickhouse
218- mergeBatch : ( existingBatch : TaskRunInsert [ ] , newBatch : TaskRunInsert [ ] ) => {
219- const merged = new Map < string , TaskRunInsert > ( ) ;
220-
221- for ( const item of existingBatch ) {
222- const key = `${ item . event } _${ item . run . id } ` ;
223- merged . set ( key , item ) ;
224- }
225-
226- for ( const item of newBatch ) {
227- if ( ! item ?. run ?. id ) {
228- this . logger . warn ( "Skipping replication event with null run" , { event : item } ) ;
229- continue ;
230- }
231-
232- const key = `${ item . event } _${ item . run . id } ` ;
233- const existingItem = merged . get ( key ) ;
234-
235- // Keep the run with the higher version (latest)
236- // and take the last occurrence for that version.
237- // Items originating from the same DB transaction have the same version.
238- if ( ! existingItem || item . _version >= existingItem . _version ) {
239- merged . set ( key , item ) ;
240- }
241- }
242-
243- return Array . from ( merged . values ( ) ) ;
217+ // Key-based deduplication to reduce duplicates sent to ClickHouse
218+ getKey : ( item ) => {
219+ return `${ item . event } _${ item . run . id } ` ;
244220 } ,
221+ // Keep the run with the higher version (latest)
222+ // and take the last occurrence for that version.
223+ // Items originating from the same DB transaction have the same version.
224+ shouldReplace : ( existing , incoming ) => incoming . _version >= existing . _version ,
245225 logger : new Logger ( "ConcurrentFlushScheduler" , options . logLevel ?? "info" ) ,
246226 tracer : options . tracer ,
247227 } ) ;
@@ -504,7 +484,7 @@ export class RunsReplicationService {
504484 this . _eventsProcessedCounter . add ( 1 , { event_type : event . tag } ) ;
505485 }
506486
507- this . logger . info ( "handle_transaction" , {
487+ this . logger . debug ( "handle_transaction" , {
508488 transaction : {
509489 xid : transaction . xid ,
510490 commitLsn : transaction . commitLsn ,
@@ -974,13 +954,16 @@ export type ConcurrentFlushSchedulerConfig<T> = {
974954 flushInterval : number ;
975955 maxConcurrency ?: number ;
976956 callback : ( flushId : string , batch : T [ ] ) => Promise < void > ;
977- mergeBatch ?: ( existingBatch : T [ ] , newBatch : T [ ] ) => T [ ] ;
957+ /** Key-based deduplication. Return null to skip the item. */
958+ getKey : ( item : T ) => string | null ;
959+ /** Determine if incoming item should replace existing. */
960+ shouldReplace : ( existing : T , incoming : T ) => boolean ;
978961 tracer ?: Tracer ;
979962 logger ?: Logger ;
980963} ;
981964
982965export class ConcurrentFlushScheduler < T > {
983- private currentBatch : T [ ] ;
966+ private batch = new Map < string , T > ( ) ;
984967 private readonly BATCH_SIZE : number ;
985968 private readonly flushInterval : number ;
986969 private readonly MAX_CONCURRENCY : number ;
@@ -995,7 +978,6 @@ export class ConcurrentFlushScheduler<T> {
995978 this . logger = config . logger ?? new Logger ( "ConcurrentFlushScheduler" , "info" ) ;
996979 this . _tracer = config . tracer ?? trace . getTracer ( "concurrent-flush-scheduler" ) ;
997980
998- this . currentBatch = [ ] ;
999981 this . BATCH_SIZE = config . batchSize ;
1000982 this . flushInterval = config . flushInterval ;
1001983 this . MAX_CONCURRENCY = config . maxConcurrency || 1 ;
@@ -1005,9 +987,17 @@ export class ConcurrentFlushScheduler<T> {
1005987 }
1006988
1007989 addToBatch ( items : T [ ] ) : void {
1008- this . currentBatch = this . config . mergeBatch
1009- ? this . config . mergeBatch ( this . currentBatch , items )
1010- : this . currentBatch . concat ( items ) ;
990+ for ( const item of items ) {
991+ const key = this . config . getKey ( item ) ;
992+ if ( key === null ) {
993+ continue ;
994+ }
995+
996+ const existing = this . batch . get ( key ) ;
997+ if ( ! existing || this . config . shouldReplace ( existing , item ) ) {
998+ this . batch . set ( key , item ) ;
999+ }
1000+ }
10111001
10121002 this . #flushNextBatchIfNeeded( ) ;
10131003 }
@@ -1031,11 +1021,16 @@ export class ConcurrentFlushScheduler<T> {
10311021 this . #flushNextBatchIfNeeded( ) ;
10321022 }
10331023
1024+ #getBatchSize( ) : number {
1025+ return this . batch . size ;
1026+ }
1027+
10341028 #flushNextBatchIfNeeded( ) : void {
1035- if ( this . currentBatch . length >= this . BATCH_SIZE || this . _isShutDown ) {
1029+ const currentSize = this . #getBatchSize( ) ;
1030+ if ( currentSize >= this . BATCH_SIZE || this . _isShutDown ) {
10361031 this . logger . debug ( "Batch size threshold reached, initiating flush" , {
10371032 batchSize : this . BATCH_SIZE ,
1038- currentSize : this . currentBatch . length ,
1033+ currentSize,
10391034 isShutDown : this . _isShutDown ,
10401035 } ) ;
10411036
@@ -1060,19 +1055,20 @@ export class ConcurrentFlushScheduler<T> {
10601055 }
10611056
10621057 async #checkAndFlush( ) : Promise < void > {
1063- if ( this . currentBatch . length > 0 ) {
1058+ const currentSize = this . #getBatchSize( ) ;
1059+ if ( currentSize > 0 ) {
10641060 this . logger . debug ( "Periodic flush check triggered" , {
1065- currentBatchSize : this . currentBatch . length ,
1061+ currentBatchSize : currentSize ,
10661062 } ) ;
10671063 await this . #flushNextBatch( ) ;
10681064 }
10691065 }
10701066
10711067 async #flushNextBatch( ) : Promise < void > {
1072- if ( this . currentBatch . length === 0 ) return ;
1068+ if ( this . batch . size === 0 ) return ;
10731069
1074- const batch = this . currentBatch ;
1075- this . currentBatch = [ ] ;
1070+ const batch = Array . from ( this . batch . values ( ) ) ;
1071+ this . batch . clear ( ) ;
10761072
10771073 const callback = this . config . callback ;
10781074
0 commit comments