@@ -586,7 +586,15 @@ export class ClickhouseEventRepository implements IEventRepository {
586586 this . addToBatch ( event ) ;
587587 }
588588
589- async completeCachedRunEvent ( params : {
589+ async completeCachedRunEvent ( {
590+ run,
591+ blockedRun,
592+ spanId,
593+ parentSpanId,
594+ spanCreatedAt,
595+ isError,
596+ endTime,
597+ } : {
590598 run : CompleteableTaskRun ;
591599 blockedRun : CompleteableTaskRun ;
592600 spanId : string ;
@@ -595,40 +603,211 @@ export class ClickhouseEventRepository implements IEventRepository {
595603 isError : boolean ;
596604 endTime ?: Date ;
597605 } ) : Promise < void > {
598- throw new Error ( "ClickhouseEventRepository.completeCachedRunEvent not implemented" ) ;
606+ if ( ! run . organizationId ) {
607+ return ;
608+ }
609+
610+ const startTime = convertDateToNanoseconds ( spanCreatedAt ) ;
611+ const expiresAt = convertDateToClickhouseDateTime (
612+ new Date ( run . createdAt . getTime ( ) + 30 * 24 * 60 * 60 * 1000 )
613+ ) ;
614+
615+ const event : TaskEventV1Input = {
616+ environment_id : run . runtimeEnvironmentId ,
617+ organization_id : run . organizationId ,
618+ project_id : run . projectId ,
619+ task_identifier : run . taskIdentifier ,
620+ run_id : blockedRun . friendlyId ,
621+ start_time : startTime . toString ( ) ,
622+ duration : calculateDurationFromStart ( startTime , endTime ?? new Date ( ) ) . toString ( ) ,
623+ trace_id : blockedRun . traceId ,
624+ span_id : spanId ,
625+ parent_span_id : parentSpanId ,
626+ message : run . taskIdentifier ,
627+ kind : "SPAN" ,
628+ status : isError ? "ERROR" : "OK" ,
629+ attributes : { } ,
630+ metadata : "{}" ,
631+ expires_at : expiresAt ,
632+ } ;
633+
634+ this . addToBatch ( event ) ;
599635 }
600636
601- async completeFailedRunEvent ( params : {
637+ async completeFailedRunEvent ( {
638+ run,
639+ endTime,
640+ exception,
641+ } : {
602642 run : CompleteableTaskRun ;
603643 endTime ?: Date ;
604644 exception : { message ?: string ; type ?: string ; stacktrace ?: string } ;
605645 } ) : Promise < void > {
606- throw new Error ( "ClickhouseEventRepository.completeFailedRunEvent not implemented" ) ;
646+ if ( ! run . organizationId ) {
647+ return ;
648+ }
649+
650+ const startTime = convertDateToNanoseconds ( run . createdAt ) ;
651+ const expiresAt = convertDateToClickhouseDateTime (
652+ new Date ( run . createdAt . getTime ( ) + 30 * 24 * 60 * 60 * 1000 )
653+ ) ;
654+
655+ const event : TaskEventV1Input = {
656+ environment_id : run . runtimeEnvironmentId ,
657+ organization_id : run . organizationId ,
658+ project_id : run . projectId ,
659+ task_identifier : run . taskIdentifier ,
660+ run_id : run . friendlyId ,
661+ start_time : startTime . toString ( ) ,
662+ duration : calculateDurationFromStart ( startTime , endTime ?? new Date ( ) ) . toString ( ) ,
663+ trace_id : run . traceId ,
664+ span_id : run . spanId ,
665+ parent_span_id : run . parentSpanId ?? "" ,
666+ message : run . taskIdentifier ,
667+ kind : "SPAN" ,
668+ status : "ERROR" ,
669+ attributes : {
670+ error : {
671+ name : exception . type ,
672+ message : exception . message ,
673+ stackTrace : exception . stacktrace ,
674+ } ,
675+ } ,
676+ metadata : "{}" ,
677+ expires_at : expiresAt ,
678+ } ;
679+
680+ this . addToBatch ( event ) ;
607681 }
608682
609- async completeExpiredRunEvent ( params : {
683+ async completeExpiredRunEvent ( {
684+ run,
685+ endTime,
686+ ttl,
687+ } : {
610688 run : CompleteableTaskRun ;
611689 endTime ?: Date ;
612690 ttl : string ;
613691 } ) : Promise < void > {
614- throw new Error ( "ClickhouseEventRepository.completeExpiredRunEvent not implemented" ) ;
692+ if ( ! run . organizationId ) {
693+ return ;
694+ }
695+
696+ const startTime = convertDateToNanoseconds ( run . createdAt ) ;
697+ const expiresAt = convertDateToClickhouseDateTime (
698+ new Date ( run . createdAt . getTime ( ) + 30 * 24 * 60 * 60 * 1000 )
699+ ) ;
700+
701+ const event : TaskEventV1Input = {
702+ environment_id : run . runtimeEnvironmentId ,
703+ organization_id : run . organizationId ,
704+ project_id : run . projectId ,
705+ task_identifier : run . taskIdentifier ,
706+ run_id : run . friendlyId ,
707+ start_time : startTime . toString ( ) ,
708+ duration : calculateDurationFromStart ( startTime , endTime ?? new Date ( ) ) . toString ( ) ,
709+ trace_id : run . traceId ,
710+ span_id : run . spanId ,
711+ parent_span_id : run . parentSpanId ?? "" ,
712+ message : run . taskIdentifier ,
713+ kind : "SPAN" ,
714+ status : "ERROR" ,
715+ attributes : {
716+ error : {
717+ message : `Run expired because the TTL (${ ttl } ) was reached` ,
718+ } ,
719+ } ,
720+ metadata : "{}" ,
721+ expires_at : expiresAt ,
722+ } ;
723+
724+ this . addToBatch ( event ) ;
615725 }
616726
617- async createAttemptFailedRunEvent ( params : {
727+ async createAttemptFailedRunEvent ( {
728+ run,
729+ endTime,
730+ attemptNumber,
731+ exception,
732+ } : {
618733 run : CompleteableTaskRun ;
619734 endTime ?: Date ;
620735 attemptNumber : number ;
621736 exception : { message ?: string ; type ?: string ; stacktrace ?: string } ;
622737 } ) : Promise < void > {
623- throw new Error ( "ClickhouseEventRepository.createAttemptFailedRunEvent not implemented" ) ;
738+ if ( ! run . organizationId ) {
739+ return ;
740+ }
741+
742+ const startTime = convertDateToNanoseconds ( endTime ?? new Date ( ) ) ;
743+ const expiresAt = convertDateToClickhouseDateTime (
744+ new Date ( run . createdAt . getTime ( ) + 30 * 24 * 60 * 60 * 1000 )
745+ ) ;
746+
747+ const event : TaskEventV1Input = {
748+ environment_id : run . runtimeEnvironmentId ,
749+ organization_id : run . organizationId ,
750+ project_id : run . projectId ,
751+ task_identifier : run . taskIdentifier ,
752+ run_id : run . friendlyId ,
753+ start_time : startTime . toString ( ) ,
754+ duration : "0" ,
755+ trace_id : run . traceId ,
756+ span_id : run . spanId ,
757+ parent_span_id : run . parentSpanId ?? "" ,
758+ message : "attempt_failed" ,
759+ kind : "ANCESTOR_OVERRIDE" ,
760+ status : "OK" ,
761+ attributes : { } ,
762+ metadata : JSON . stringify ( {
763+ exception,
764+ attemptNumber,
765+ runId : run . friendlyId ,
766+ } ) ,
767+ expires_at : expiresAt ,
768+ } ;
769+
770+ this . addToBatch ( event ) ;
624771 }
625772
626- async cancelRunEvent ( params : {
773+ async cancelRunEvent ( {
774+ reason,
775+ run,
776+ cancelledAt,
777+ } : {
627778 reason : string ;
628779 run : CompleteableTaskRun ;
629780 cancelledAt : Date ;
630781 } ) : Promise < void > {
631- throw new Error ( "ClickhouseEventRepository.cancelRunEvent not implemented" ) ;
782+ if ( ! run . organizationId ) {
783+ return ;
784+ }
785+
786+ const startTime = convertDateToNanoseconds ( cancelledAt ) ;
787+ const expiresAt = convertDateToClickhouseDateTime (
788+ new Date ( run . createdAt . getTime ( ) + 30 * 24 * 60 * 60 * 1000 )
789+ ) ;
790+
791+ const event : TaskEventV1Input = {
792+ environment_id : run . runtimeEnvironmentId ,
793+ organization_id : run . organizationId ,
794+ project_id : run . projectId ,
795+ task_identifier : run . taskIdentifier ,
796+ run_id : run . friendlyId ,
797+ start_time : startTime . toString ( ) ,
798+ duration : calculateDurationFromStart ( startTime , cancelledAt ) . toString ( ) ,
799+ trace_id : run . traceId ,
800+ span_id : run . spanId ,
801+ parent_span_id : run . parentSpanId ?? "" ,
802+ message : run . taskIdentifier ,
803+ kind : "SPAN" ,
804+ status : "CANCELLED" ,
805+ attributes : { } ,
806+ metadata : "{}" ,
807+ expires_at : expiresAt ,
808+ } ;
809+
810+ this . addToBatch ( event ) ;
632811 }
633812
634813 async crashEvent ( params : {
@@ -786,7 +965,7 @@ export class ClickhouseEventRepository implements IEventRepository {
786965 // We need to add an event to the span
787966 span . data . events . push ( {
788967 name : record . message ,
789- time : new Date ( record . start_time ) ,
968+ time : convertClickhouseDateTime64ToJsDate ( record . start_time ) ,
790969 properties : parsedMetadata ?? { } ,
791970 } ) ;
792971 }
0 commit comments