11import type { ClickHouse , TaskEventSummaryV1Result , TaskEventV1Input } from "@internal/clickhouse" ;
22import { Attributes , startSpan , trace , Tracer } from "@internal/tracing" ;
3- import { DynamicFlushScheduler } from "../dynamicFlushScheduler.server" ;
4- import type {
5- CompleteableTaskRun ,
6- CreateEventInput ,
7- EventBuilder ,
8- ExceptionEventProperties ,
9- IEventRepository ,
10- RunPreparedEvent ,
11- SpanDetail ,
12- SpanSummary ,
13- TaskEventRecord ,
14- TraceAttributes ,
15- TraceDetailedSummary ,
16- TraceEventOptions ,
17- TraceSummary ,
18- } from "./eventRepository.types" ;
19- import { tracePubSub } from "../services/tracePubSub.server" ;
20- import type { TaskEventStoreTable } from "../taskEventStore.server" ;
21- import { logger } from "~/services/logger.server" ;
22- import {
23- generateSpanId ,
24- extractContextFromCarrier ,
25- getNowInNanoseconds ,
26- generateDeterministicSpanId ,
27- calculateDurationFromStart ,
28- generateTraceId ,
29- parseEventsField ,
30- convertDateToNanoseconds ,
31- createExceptionPropertiesFromError ,
32- } from "./common.server" ;
3+ import { createJsonErrorObject } from "@trigger.dev/core/v3/errors" ;
334import { serializeTraceparent } from "@trigger.dev/core/v3/isomorphic" ;
345import {
356 AttemptFailedSpanEvent ,
@@ -44,11 +15,39 @@ import {
4415 TaskEventStyle ,
4516 TaskRunError ,
4617} from "@trigger.dev/core/v3/schemas" ;
18+ import { SemanticInternalAttributes } from "@trigger.dev/core/v3/semanticInternalAttributes" ;
4719import { unflattenAttributes } from "@trigger.dev/core/v3/utils/flattenAttributes" ;
4820import { TaskEventLevel } from "@trigger.dev/database" ;
21+ import { logger } from "~/services/logger.server" ;
22+ import { DynamicFlushScheduler } from "../dynamicFlushScheduler.server" ;
23+ import { tracePubSub } from "../services/tracePubSub.server" ;
24+ import type { TaskEventStoreTable } from "../taskEventStore.server" ;
25+ import {
26+ calculateDurationFromStart ,
27+ calculateDurationFromStartJsDate ,
28+ convertDateToNanoseconds ,
29+ createExceptionPropertiesFromError ,
30+ extractContextFromCarrier ,
31+ generateDeterministicSpanId ,
32+ generateSpanId ,
33+ generateTraceId ,
34+ getNowInNanoseconds ,
35+ parseEventsField ,
36+ } from "./common.server" ;
37+ import type {
38+ CompleteableTaskRun ,
39+ CreateEventInput ,
40+ EventBuilder ,
41+ IEventRepository ,
42+ RunPreparedEvent ,
43+ SpanDetail ,
44+ SpanSummary ,
45+ TraceAttributes ,
46+ TraceDetailedSummary ,
47+ TraceEventOptions ,
48+ TraceSummary ,
49+ } from "./eventRepository.types" ;
4950import { originalRunIdCache } from "./originalRunIdCache.server" ;
50- import { SemanticInternalAttributes } from "@trigger.dev/core/v3/semanticInternalAttributes" ;
51- import { createJsonErrorObject } from "@trigger.dev/core/v3/errors" ;
5251
5352export type ClickhouseEventRepositoryConfig = {
5453 clickhouse : ClickHouse ;
@@ -342,6 +341,7 @@ export class ClickhouseEventRepository implements IEventRepository {
342341 private createEventToTaskEventV1InputMetadata ( event : CreateEventInput ) : string {
343342 return JSON . stringify ( {
344343 style : event . style ? unflattenAttributes ( event . style ) : undefined ,
344+ attemptNumber : event . attemptNumber ,
345345 } ) ;
346346 }
347347
@@ -783,7 +783,7 @@ export class ClickhouseEventRepository implements IEventRepository {
783783 return ;
784784 }
785785
786- const startTime = convertDateToNanoseconds ( cancelledAt ) ;
786+ const startTime = convertDateToNanoseconds ( run . createdAt ) ;
787787 const expiresAt = convertDateToClickhouseDateTime (
788788 new Date ( run . createdAt . getTime ( ) + 30 * 24 * 60 * 60 * 1000 )
789789 ) ;
@@ -803,7 +803,9 @@ export class ClickhouseEventRepository implements IEventRepository {
803803 kind : "SPAN" ,
804804 status : "CANCELLED" ,
805805 attributes : { } ,
806- metadata : "{}" ,
806+ metadata : JSON . stringify ( {
807+ reason,
808+ } ) ,
807809 expires_at : expiresAt ,
808810 } ;
809811
@@ -867,7 +869,7 @@ export class ClickhouseEventRepository implements IEventRepository {
867869 return acc ;
868870 } , { } as Record < string , TaskEventSummaryV1Result [ ] > ) ;
869871
870- const spanSummaries : Record < string , SpanSummary > = { } ;
872+ const spanSummaries = new Map < string , SpanSummary > ( ) ;
871873 let rootSpanId : string | undefined ;
872874
873875 for ( const [ spanId , spanRecords ] of Object . entries ( recordsGroupedBySpanId ) ) {
@@ -886,7 +888,7 @@ export class ClickhouseEventRepository implements IEventRepository {
886888 spanSummary,
887889 } ) ;
888890
889- spanSummaries [ spanId ] = spanSummary ;
891+ spanSummaries . set ( spanId , spanSummary ) ;
890892
891893 if ( ! rootSpanId && ! spanSummary . parentId ) {
892894 rootSpanId = spanId ;
@@ -901,20 +903,120 @@ export class ClickhouseEventRepository implements IEventRepository {
901903 return ;
902904 }
903905
904- const spans = Object . values ( spanSummaries ) ;
905- const rootSpan = spanSummaries [ rootSpanId ] ;
906+ const spans = Array . from ( spanSummaries . values ( ) ) ;
907+ const rootSpan = spanSummaries . get ( rootSpanId ) ;
908+
909+ if ( ! rootSpan ) {
910+ logger . debug ( "ClickhouseEventRepository.getTraceSummary no rootSpan" , {
911+ spanSummaries,
912+ } ) ;
913+
914+ return ;
915+ }
916+
917+ const finalSpans = spans . map ( ( span ) => {
918+ return this . #applyAncestorOverrides( span , spanSummaries ) ;
919+ } ) ;
906920
907921 logger . info ( "ClickhouseEventRepository.getTraceSummary result" , {
908922 rootSpan,
909- spans,
923+ spans : finalSpans ,
910924 } ) ;
911925
912926 return {
913927 rootSpan,
914- spans,
928+ spans : finalSpans ,
915929 } ;
916930 }
917931
932+ #applyAncestorOverrides( span : SpanSummary , spansById : Map < string , SpanSummary > ) : SpanSummary {
933+ if ( span . data . level !== "TRACE" ) {
934+ return span ;
935+ }
936+
937+ if ( ! span . data . isPartial ) {
938+ return span ;
939+ }
940+
941+ if ( ! span . parentId ) {
942+ return span ;
943+ }
944+
945+ // Now we need to walk the ancestors of the span by span.parentId
946+ // The first ancestor that is a TRACE span that is "closed" we will use to override the span
947+ let parentSpanId : string | undefined = span . parentId ;
948+ let overrideSpan : SpanSummary | undefined ;
949+
950+ while ( parentSpanId ) {
951+ const parentSpan = spansById . get ( parentSpanId ) ;
952+
953+ if ( ! parentSpan ) {
954+ break ;
955+ }
956+
957+ if ( parentSpan . data . level === "TRACE" && ! parentSpan . data . isPartial ) {
958+ overrideSpan = parentSpan ;
959+ break ;
960+ }
961+
962+ parentSpanId = parentSpan . parentId ;
963+ }
964+
965+ if ( overrideSpan ) {
966+ return this . #applyAncestorToSpan( span , overrideSpan ) ;
967+ }
968+
969+ return span ;
970+ }
971+
972+ #applyAncestorToSpan( span : SpanSummary , overrideSpan : SpanSummary ) : SpanSummary {
973+ const overrideEndTime = calculateEndTimeFromStartTime (
974+ overrideSpan . data . startTime ,
975+ overrideSpan . data . duration
976+ ) ;
977+
978+ if ( overrideSpan . data . isCancelled ) {
979+ span . data . isCancelled = true ;
980+ span . data . isPartial = false ;
981+ span . data . isError = false ;
982+ span . data . duration = calculateDurationFromStartJsDate ( span . data . startTime , overrideEndTime ) ;
983+
984+ const cancellationEvent = overrideSpan . data . events . find (
985+ ( event ) => event . name === "cancellation"
986+ ) ;
987+
988+ if ( cancellationEvent ) {
989+ span . data . events . push ( cancellationEvent ) ;
990+ }
991+ }
992+
993+ if ( overrideSpan . data . isError && span . data . attemptNumber ) {
994+ const attemptFailedEvent = overrideSpan . data . events . find (
995+ ( event ) =>
996+ event . name === "attempt_failed" &&
997+ event . properties . attemptNumber === span . data . attemptNumber &&
998+ event . properties . runId === span . runId
999+ ) as AttemptFailedSpanEvent | undefined ;
1000+
1001+ if ( attemptFailedEvent ) {
1002+ span . data . isError = true ;
1003+ span . data . isPartial = false ;
1004+ span . data . isCancelled = false ;
1005+ span . data . duration = calculateDurationFromStartJsDate ( span . data . startTime , overrideEndTime ) ;
1006+ span . data . events . push ( {
1007+ name : "exception" ,
1008+ time : attemptFailedEvent . time ,
1009+ properties : {
1010+ exception : attemptFailedEvent . properties . exception ,
1011+ } ,
1012+ } satisfies ExceptionSpanEvent ) ;
1013+ span . data . events . push ( attemptFailedEvent ) ;
1014+ }
1015+ }
1016+
1017+ return span ;
1018+ }
1019+
9181020 #mergeRecordsIntoSpanSummary(
9191021 spanId : string ,
9201022 records : TaskEventSummaryV1Result [ ]
@@ -955,6 +1057,14 @@ export class ClickhouseEventRepository implements IEventRepository {
9551057
9561058 const parsedMetadata = this . #parseMetadata( record . metadata ) ;
9571059
1060+ if (
1061+ parsedMetadata &&
1062+ "attemptNumber" in parsedMetadata &&
1063+ typeof parsedMetadata . attemptNumber === "number"
1064+ ) {
1065+ span . data . attemptNumber = parsedMetadata . attemptNumber ;
1066+ }
1067+
9581068 if ( record . kind === "ANCESTOR_OVERRIDE" || record . kind === "SPAN_EVENT" ) {
9591069 // We need to add an event to the span
9601070 span . data . events . push ( {
@@ -986,6 +1096,7 @@ export class ClickhouseEventRepository implements IEventRepository {
9861096 typeof record . duration === "number" ? record . duration : Number ( record . duration ) ;
9871097 } else {
9881098 span . data . startTime = convertClickhouseDateTime64ToJsDate ( record . start_time ) ;
1099+ span . data . message = record . message ;
9891100 }
9901101 }
9911102 }
@@ -1173,3 +1284,7 @@ function kindToLevel(kind: string): TaskEventLevel {
11731284function isLogEvent ( kind : string ) : boolean {
11741285 return kind . startsWith ( "LOG_" ) || kind === "DEBUG_EVENT" ;
11751286}
1287+
1288+ function calculateEndTimeFromStartTime ( startTime : Date , duration : number ) : Date {
1289+ return new Date ( startTime . getTime ( ) + duration / 1_000_000 ) ;
1290+ }
0 commit comments