11import type {
22 ClickHouse ,
3+ TaskEventDetailedSummaryV1Result ,
34 TaskEventDetailsV1Result ,
45 TaskEventSummaryV1Result ,
56 TaskEventV1Input ,
@@ -48,8 +49,10 @@ import type {
4849 IEventRepository ,
4950 RunPreparedEvent ,
5051 SpanDetail ,
52+ SpanDetailedSummary ,
5153 SpanOverride ,
5254 SpanSummary ,
55+ SpanSummaryCommon ,
5356 TraceAttributes ,
5457 TraceDetailedSummary ,
5558 TraceEventOptions ,
@@ -891,13 +894,6 @@ export class ClickhouseEventRepository implements IEventRepository {
891894 queryBuilder . limit ( this . _config . maximumTraceSummaryViewCount ) ;
892895 }
893896
894- const { query, params } = queryBuilder . build ( ) ;
895-
896- logger . debug ( "ClickhouseEventRepository.getTraceSummary query" , {
897- query,
898- params,
899- } ) ;
900-
901897 const [ queryError , records ] = await queryBuilder . execute ( ) ;
902898
903899 if ( queryError ) {
@@ -908,10 +904,6 @@ export class ClickhouseEventRepository implements IEventRepository {
908904 return ;
909905 }
910906
911- logger . info ( "ClickhouseEventRepository.getTraceSummary" , {
912- records,
913- } ) ;
914-
915907 const recordsGroupedBySpanId = records . reduce ( ( acc , record ) => {
916908 acc [ record . span_id ] = [ ...( acc [ record . span_id ] ?? [ ] ) , record ] ;
917909 return acc ;
@@ -921,21 +913,12 @@ export class ClickhouseEventRepository implements IEventRepository {
921913 let rootSpanId : string | undefined ;
922914
923915 for ( const [ spanId , spanRecords ] of Object . entries ( recordsGroupedBySpanId ) ) {
924- logger . debug ( "ClickhouseEventRepository.getTraceSummary recordsGroupedBySpanId" , {
925- spanId,
926- spanRecords,
927- } ) ;
928-
929916 const spanSummary = this . #mergeRecordsIntoSpanSummary( spanId , spanRecords ) ;
930917
931918 if ( ! spanSummary ) {
932919 continue ;
933920 }
934921
935- logger . debug ( "ClickhouseEventRepository.getTraceSummary spanSummary" , {
936- spanSummary,
937- } ) ;
938-
939922 spanSummaries . set ( spanId , spanSummary ) ;
940923
941924 if ( ! rootSpanId && ! spanSummary . parentId ) {
@@ -944,21 +927,13 @@ export class ClickhouseEventRepository implements IEventRepository {
944927 }
945928
946929 if ( ! rootSpanId ) {
947- logger . debug ( "ClickhouseEventRepository.getTraceSummary no rootSpanId" , {
948- spanSummaries,
949- } ) ;
950-
951930 return ;
952931 }
953932
954933 const spans = Array . from ( spanSummaries . values ( ) ) ;
955934 const rootSpan = spanSummaries . get ( rootSpanId ) ;
956935
957936 if ( ! rootSpan ) {
958- logger . debug ( "ClickhouseEventRepository.getTraceSummary no rootSpan" , {
959- spanSummaries,
960- } ) ;
961-
962937 return ;
963938 }
964939
@@ -968,11 +943,6 @@ export class ClickhouseEventRepository implements IEventRepository {
968943 return this . #applyAncestorOverrides( span , spanSummaries , overridesBySpanId ) ;
969944 } ) ;
970945
971- logger . info ( "ClickhouseEventRepository.getTraceSummary result" , {
972- rootSpan,
973- spans : finalSpans ,
974- } ) ;
975-
976946 return {
977947 rootSpan,
978948 spans : finalSpans ,
@@ -1008,13 +978,6 @@ export class ClickhouseEventRepository implements IEventRepository {
1008978
1009979 queryBuilder . orderBy ( "start_time ASC" ) ;
1010980
1011- const { query, params } = queryBuilder . build ( ) ;
1012-
1013- logger . debug ( "ClickhouseEventRepository.getSpan query" , {
1014- query,
1015- params,
1016- } ) ;
1017-
1018981 const [ queryError , records ] = await queryBuilder . execute ( ) ;
1019982
1020983 if ( queryError ) {
@@ -1027,13 +990,20 @@ export class ClickhouseEventRepository implements IEventRepository {
1027990
1028991 const span = this . #mergeRecordsIntoSpanDetail( spanId , records ) ;
1029992
1030- logger . info ( "ClickhouseEventRepository.getSpan" , {
1031- span,
1032- } ) ;
1033-
1034993 return span ;
1035994 }
1036995
996+ async getSpanOriginalRunId (
997+ storeTable : TaskEventStoreTable ,
998+ environmentId : string ,
999+ spanId : string ,
1000+ traceId : string ,
1001+ startCreatedAt : Date ,
1002+ endCreatedAt ?: Date
1003+ ) : Promise < string | undefined > {
1004+ return await originalRunIdCache . lookup ( traceId , spanId ) ;
1005+ }
1006+
10371007 #mergeRecordsIntoSpanDetail(
10381008 spanId : string ,
10391009 records : TaskEventDetailsV1Result [ ]
@@ -1132,11 +1102,11 @@ export class ClickhouseEventRepository implements IEventRepository {
11321102 return span ;
11331103 }
11341104
1135- #applyAncestorOverrides(
1136- span : SpanSummary ,
1137- spansById : Map < string , SpanSummary > ,
1105+ #applyAncestorOverrides< TSpanSummary extends SpanSummaryCommon > (
1106+ span : TSpanSummary ,
1107+ spansById : Map < string , TSpanSummary > ,
11381108 overridesBySpanId : Record < string , SpanOverride >
1139- ) : SpanSummary {
1109+ ) : TSpanSummary {
11401110 if ( span . data . level !== "TRACE" ) {
11411111 return span ;
11421112 }
@@ -1152,7 +1122,7 @@ export class ClickhouseEventRepository implements IEventRepository {
11521122 // Now we need to walk the ancestors of the span by span.parentId
11531123 // The first ancestor that is a TRACE span that is "closed" we will use to override the span
11541124 let parentSpanId : string | undefined = span . parentId ;
1155- let overrideSpan : SpanSummary | undefined ;
1125+ let overrideSpan : TSpanSummary | undefined ;
11561126
11571127 while ( parentSpanId ) {
11581128 const parentSpan = spansById . get ( parentSpanId ) ;
@@ -1176,11 +1146,11 @@ export class ClickhouseEventRepository implements IEventRepository {
11761146 return span ;
11771147 }
11781148
1179- #applyAncestorToSpan(
1180- span : SpanSummary ,
1181- overrideSpan : SpanSummary ,
1149+ #applyAncestorToSpan< TSpanSummary extends SpanSummaryCommon > (
1150+ span : TSpanSummary ,
1151+ overrideSpan : TSpanSummary ,
11821152 overridesBySpanId : Record < string , SpanOverride >
1183- ) : SpanSummary {
1153+ ) : TSpanSummary {
11841154 if ( overridesBySpanId [ span . id ] ) {
11851155 return span ;
11861156 }
@@ -1361,7 +1331,182 @@ export class ClickhouseEventRepository implements IEventRepository {
13611331 endCreatedAt ?: Date ,
13621332 options ?: { includeDebugLogs ?: boolean }
13631333 ) : Promise < TraceDetailedSummary | undefined > {
1364- throw new Error ( "ClickhouseEventRepository.getTraceDetailedSummary not implemented" ) ;
1334+ const startCreatedAtWithBuffer = new Date ( startCreatedAt . getTime ( ) - 1000 ) ;
1335+
1336+ const queryBuilder = this . _clickhouse . taskEvents . traceDetailedSummaryQueryBuilder ( ) ;
1337+
1338+ queryBuilder . where ( "environment_id = {environmentId: String}" , { environmentId } ) ;
1339+ queryBuilder . where ( "trace_id = {traceId: String}" , { traceId } ) ;
1340+ queryBuilder . where ( "start_time >= {startCreatedAt: String}" , {
1341+ startCreatedAt : convertDateToNanoseconds ( startCreatedAtWithBuffer ) . toString ( ) ,
1342+ } ) ;
1343+
1344+ if ( endCreatedAt ) {
1345+ queryBuilder . where ( "start_time <= {endCreatedAt: String}" , {
1346+ endCreatedAt : convertDateToNanoseconds ( endCreatedAt ) . toString ( ) ,
1347+ } ) ;
1348+ }
1349+
1350+ if ( options ?. includeDebugLogs === false ) {
1351+ queryBuilder . where ( "kind != {kind: String}" , { kind : "DEBUG_EVENT" } ) ;
1352+ }
1353+
1354+ queryBuilder . orderBy ( "start_time ASC" ) ;
1355+
1356+ if ( this . _config . maximumTraceSummaryViewCount ) {
1357+ queryBuilder . limit ( this . _config . maximumTraceSummaryViewCount ) ;
1358+ }
1359+
1360+ const [ queryError , records ] = await queryBuilder . execute ( ) ;
1361+
1362+ if ( queryError ) {
1363+ throw queryError ;
1364+ }
1365+
1366+ if ( ! records ) {
1367+ return ;
1368+ }
1369+
1370+ const recordsGroupedBySpanId = records . reduce ( ( acc , record ) => {
1371+ acc [ record . span_id ] = [ ...( acc [ record . span_id ] ?? [ ] ) , record ] ;
1372+ return acc ;
1373+ } , { } as Record < string , TaskEventDetailedSummaryV1Result [ ] > ) ;
1374+
1375+ const spanSummaries = new Map < string , SpanDetailedSummary > ( ) ;
1376+ let rootSpanId : string | undefined ;
1377+
1378+ for ( const [ spanId , spanRecords ] of Object . entries ( recordsGroupedBySpanId ) ) {
1379+ const spanSummary = this . #mergeRecordsIntoSpanDetailedSummary( spanId , spanRecords ) ;
1380+
1381+ if ( ! spanSummary ) {
1382+ continue ;
1383+ }
1384+
1385+ spanSummaries . set ( spanId , spanSummary ) ;
1386+
1387+ if ( ! rootSpanId && ! spanSummary . parentId ) {
1388+ rootSpanId = spanId ;
1389+ }
1390+ }
1391+
1392+ if ( ! rootSpanId ) {
1393+ return ;
1394+ }
1395+
1396+ const spans = Array . from ( spanSummaries . values ( ) ) ;
1397+
1398+ const overridesBySpanId : Record < string , SpanOverride > = { } ;
1399+ const spanDetailedSummaryMap = new Map < string , SpanDetailedSummary > ( ) ;
1400+
1401+ const finalSpans = spans . map ( ( span ) => {
1402+ const finalSpan = this . #applyAncestorOverrides( span , spanSummaries , overridesBySpanId ) ;
1403+ spanDetailedSummaryMap . set ( span . id , finalSpan ) ;
1404+ return finalSpan ;
1405+ } ) ;
1406+
1407+ // Second pass: build parent-child relationships
1408+ for ( const finalSpan of finalSpans ) {
1409+ if ( finalSpan . parentId ) {
1410+ const parent = spanDetailedSummaryMap . get ( finalSpan . parentId ) ;
1411+ if ( parent ) {
1412+ parent . children . push ( finalSpan ) ;
1413+ }
1414+ }
1415+ }
1416+
1417+ const rootSpan = spanDetailedSummaryMap . get ( rootSpanId ) ;
1418+
1419+ if ( ! rootSpan ) {
1420+ return ;
1421+ }
1422+
1423+ return {
1424+ traceId,
1425+ rootSpan,
1426+ } ;
1427+ }
1428+
1429+ #mergeRecordsIntoSpanDetailedSummary(
1430+ spanId : string ,
1431+ records : TaskEventDetailedSummaryV1Result [ ]
1432+ ) : SpanDetailedSummary | undefined {
1433+ if ( records . length === 0 ) {
1434+ return undefined ;
1435+ }
1436+
1437+ let span : SpanDetailedSummary | undefined ;
1438+
1439+ for ( const record of records ) {
1440+ if ( ! span ) {
1441+ span = {
1442+ id : spanId ,
1443+ parentId : record . parent_span_id ? record . parent_span_id : undefined ,
1444+ runId : record . run_id ,
1445+ data : {
1446+ message : record . message ,
1447+ taskSlug : undefined ,
1448+ duration :
1449+ typeof record . duration === "number" ? record . duration : Number ( record . duration ) ,
1450+ isError : false ,
1451+ isPartial : true , // Partial by default, can only be set to false
1452+ isCancelled : false ,
1453+ startTime : convertClickhouseDateTime64ToJsDate ( record . start_time ) ,
1454+ level : kindToLevel ( record . kind ) ,
1455+ events : [ ] ,
1456+ } ,
1457+ children : [ ] ,
1458+ } ;
1459+ }
1460+
1461+ if ( isLogEvent ( record . kind ) ) {
1462+ span . data . isPartial = false ;
1463+ span . data . isCancelled = false ;
1464+ span . data . isError = record . status === "ERROR" ;
1465+ }
1466+
1467+ const parsedMetadata = this . #parseMetadata( record . metadata ) ;
1468+
1469+ if (
1470+ parsedMetadata &&
1471+ "attemptNumber" in parsedMetadata &&
1472+ typeof parsedMetadata . attemptNumber === "number"
1473+ ) {
1474+ span . data . attemptNumber = parsedMetadata . attemptNumber ;
1475+ }
1476+
1477+ if ( record . kind === "ANCESTOR_OVERRIDE" || record . kind === "SPAN_EVENT" ) {
1478+ // We need to add an event to the span
1479+ span . data . events . push ( {
1480+ name : record . message ,
1481+ time : convertClickhouseDateTime64ToJsDate ( record . start_time ) ,
1482+ properties : parsedMetadata ?? { } ,
1483+ } ) ;
1484+ }
1485+
1486+ if ( record . kind === "SPAN" ) {
1487+ if ( record . status === "ERROR" ) {
1488+ span . data . isError = true ;
1489+ span . data . isPartial = false ;
1490+ span . data . isCancelled = false ;
1491+ } else if ( record . status === "CANCELLED" ) {
1492+ span . data . isCancelled = true ;
1493+ span . data . isPartial = false ;
1494+ span . data . isError = false ;
1495+ } else if ( record . status === "OK" ) {
1496+ span . data . isPartial = false ;
1497+ }
1498+
1499+ if ( record . status !== "PARTIAL" ) {
1500+ span . data . duration =
1501+ typeof record . duration === "number" ? record . duration : Number ( record . duration ) ;
1502+ } else {
1503+ span . data . startTime = convertClickhouseDateTime64ToJsDate ( record . start_time ) ;
1504+ span . data . message = record . message ;
1505+ }
1506+ }
1507+ }
1508+
1509+ return span ;
13651510 }
13661511
13671512 async getRunEvents (
@@ -1373,17 +1518,6 @@ export class ClickhouseEventRepository implements IEventRepository {
13731518 ) : Promise < RunPreparedEvent [ ] > {
13741519 throw new Error ( "ClickhouseEventRepository.getRunEvents not implemented" ) ;
13751520 }
1376-
1377- async getSpanOriginalRunId (
1378- storeTable : TaskEventStoreTable ,
1379- environmentId : string ,
1380- spanId : string ,
1381- traceId : string ,
1382- startCreatedAt : Date ,
1383- endCreatedAt ?: Date
1384- ) : Promise < string | undefined > {
1385- return await originalRunIdCache . lookup ( traceId , spanId ) ;
1386- }
13871521}
13881522
13891523export const convertDateToClickhouseDateTime = ( date : Date ) : string => {
0 commit comments