@@ -21,6 +21,7 @@ import {
2121import type { MetricsV1Input } from "@internal/clickhouse" ;
2222import { logger } from "~/services/logger.server" ;
2323import { clickhouseClient } from "~/services/clickhouseInstance.server" ;
24+ import { DynamicFlushScheduler } from "./dynamicFlushScheduler.server" ;
2425import { ClickhouseEventRepository } from "./eventRepository/clickhouseEventRepository.server" ;
2526import {
2627 clickhouseEventRepository ,
@@ -47,6 +48,7 @@ class OTLPExporter {
4748 private readonly _eventRepository : EventRepository ,
4849 private readonly _clickhouseEventRepository : ClickhouseEventRepository ,
4950 private readonly _clickhouseEventRepositoryV2 : ClickhouseEventRepository ,
51+ private readonly _metricsFlushScheduler : DynamicFlushScheduler < MetricsV1Input > ,
5052 private readonly _verbose : boolean ,
5153 private readonly _spanAttributeValueLengthLimit : number
5254 ) {
@@ -87,7 +89,7 @@ class OTLPExporter {
8789 span . setAttribute ( "metric_row_count" , rows . length ) ;
8890
8991 if ( rows . length > 0 ) {
90- await clickhouseClient . metrics . insert ( rows ) ;
92+ this . _metricsFlushScheduler . addToBatch ( rows ) ;
9193 }
9294
9395 return ExportMetricsServiceResponse . create ( ) ;
@@ -490,7 +492,7 @@ function convertMetricsToClickhouseRows(
490492 if ( metric . gauge ) {
491493 for ( const dp of metric . gauge . dataPoints ) {
492494 const value : number =
493- ( dp . asDouble ?? 0 ) !== 0 ? dp . asDouble ! : dp . asInt !== BigInt ( 0 ) ? Number ( dp . asInt ) : 0 ;
495+ dp . asDouble !== undefined ? dp . asDouble : dp . asInt !== undefined ? Number ( dp . asInt ) : 0 ;
494496 const resolved = resolveDataPointContext ( dp . attributes ?? [ ] , resourceCtx ) ;
495497
496498 rows . push ( {
@@ -515,7 +517,7 @@ function convertMetricsToClickhouseRows(
515517 if ( metric . sum ) {
516518 for ( const dp of metric . sum . dataPoints ) {
517519 const value : number =
518- ( dp . asDouble ?? 0 ) !== 0 ? dp . asDouble ! : dp . asInt !== BigInt ( 0 ) ? Number ( dp . asInt ) : 0 ;
520+ dp . asDouble !== undefined ? dp . asDouble : dp . asInt !== undefined ? Number ( dp . asInt ) : 0 ;
519521 const resolved = resolveDataPointContext ( dp . attributes ?? [ ] , resourceCtx ) ;
520522
521523 rows . push ( {
@@ -1133,10 +1135,22 @@ function hasUnpairedSurrogateAtEnd(str: string): boolean {
11331135export const otlpExporter = singleton ( "otlpExporter" , initializeOTLPExporter ) ;
11341136
11351137function initializeOTLPExporter ( ) {
1138+ const metricsFlushScheduler = new DynamicFlushScheduler < MetricsV1Input > ( {
1139+ batchSize : env . METRICS_CLICKHOUSE_BATCH_SIZE ,
1140+ flushInterval : env . METRICS_CLICKHOUSE_FLUSH_INTERVAL_MS ,
1141+ callback : async ( _flushId , batch ) => {
1142+ await clickhouseClient . metrics . insert ( batch ) ;
1143+ } ,
1144+ minConcurrency : 1 ,
1145+ maxConcurrency : env . METRICS_CLICKHOUSE_MAX_CONCURRENCY ,
1146+ loadSheddingEnabled : false ,
1147+ } ) ;
1148+
11361149 return new OTLPExporter (
11371150 eventRepository ,
11381151 clickhouseEventRepository ,
11391152 clickhouseEventRepositoryV2 ,
1153+ metricsFlushScheduler ,
11401154 process . env . OTLP_EXPORTER_VERBOSE === "1" ,
11411155 process . env . SERVER_OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT
11421156 ? parseInt ( process . env . SERVER_OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT , 10 )
0 commit comments