@@ -1123,4 +1123,308 @@ describe("RunsReplicationService (part 2/2)", () => {
11231123 await runsReplicationService . stop ( ) ;
11241124 }
11251125 ) ;
1126+
1127+ containerTest (
1128+ "should exhaustively replicate all TaskRun columns to ClickHouse" ,
1129+ async ( { clickhouseContainer, redisOptions, postgresContainer, prisma } ) => {
1130+ await prisma . $executeRawUnsafe ( `ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;` ) ;
1131+
1132+ const clickhouse = new ClickHouse ( {
1133+ url : clickhouseContainer . getConnectionUrl ( ) ,
1134+ name : "runs-replication-exhaustive" ,
1135+ logLevel : "warn" ,
1136+ } ) ;
1137+
1138+ const runsReplicationService = new RunsReplicationService ( {
1139+ clickhouse,
1140+ pgConnectionUrl : postgresContainer . getConnectionUri ( ) ,
1141+ serviceName : "runs-replication-exhaustive" ,
1142+ slotName : "task_runs_to_clickhouse_v1" ,
1143+ publicationName : "task_runs_to_clickhouse_v1_publication" ,
1144+ redisOptions,
1145+ maxFlushConcurrency : 1 ,
1146+ flushIntervalMs : 100 ,
1147+ flushBatchSize : 1 ,
1148+ leaderLockTimeoutMs : 5000 ,
1149+ leaderLockExtendIntervalMs : 1000 ,
1150+ ackIntervalSeconds : 5 ,
1151+ logLevel : "warn" ,
1152+ } ) ;
1153+
1154+ await runsReplicationService . start ( ) ;
1155+
1156+ const organization = await prisma . organization . create ( {
1157+ data : {
1158+ title : "test-exhaustive" ,
1159+ slug : "test-exhaustive" ,
1160+ } ,
1161+ } ) ;
1162+
1163+ const project = await prisma . project . create ( {
1164+ data : {
1165+ name : "test-exhaustive" ,
1166+ slug : "test-exhaustive" ,
1167+ organizationId : organization . id ,
1168+ externalRef : "test-exhaustive" ,
1169+ } ,
1170+ } ) ;
1171+
1172+ const runtimeEnvironment = await prisma . runtimeEnvironment . create ( {
1173+ data : {
1174+ slug : "test-exhaustive" ,
1175+ type : "PRODUCTION" ,
1176+ projectId : project . id ,
1177+ organizationId : organization . id ,
1178+ apiKey : "test-exhaustive" ,
1179+ pkApiKey : "test-exhaustive" ,
1180+ shortcode : "test-exhaustive" ,
1181+ } ,
1182+ } ) ;
1183+
1184+ // Create a batch for the batchId field
1185+ const batch = await prisma . batchTaskRun . create ( {
1186+ data : {
1187+ friendlyId : "batch_exhaustive" ,
1188+ runtimeEnvironmentId : runtimeEnvironment . id ,
1189+ status : "PENDING" ,
1190+ } ,
1191+ } ) ;
1192+
1193+ // Create a root run for the rootTaskRunId field
1194+ const rootRun = await prisma . taskRun . create ( {
1195+ data : {
1196+ friendlyId : "run_root_exhaustive" ,
1197+ taskIdentifier : "root-task" ,
1198+ payload : JSON . stringify ( { root : true } ) ,
1199+ traceId : "root-trace-id" ,
1200+ spanId : "root-span-id" ,
1201+ queue : "root-queue" ,
1202+ runtimeEnvironmentId : runtimeEnvironment . id ,
1203+ projectId : project . id ,
1204+ organizationId : organization . id ,
1205+ environmentType : "PRODUCTION" ,
1206+ engine : "V2" ,
1207+ } ,
1208+ } ) ;
1209+
1210+ // Create a parent run for the parentTaskRunId field
1211+ const parentRun = await prisma . taskRun . create ( {
1212+ data : {
1213+ friendlyId : "run_parent_exhaustive" ,
1214+ taskIdentifier : "parent-task" ,
1215+ payload : JSON . stringify ( { parent : true } ) ,
1216+ traceId : "parent-trace-id" ,
1217+ spanId : "parent-span-id" ,
1218+ queue : "parent-queue" ,
1219+ runtimeEnvironmentId : runtimeEnvironment . id ,
1220+ projectId : project . id ,
1221+ organizationId : organization . id ,
1222+ environmentType : "PRODUCTION" ,
1223+ engine : "V2" ,
1224+ rootTaskRunId : rootRun . id ,
1225+ depth : 1 ,
1226+ } ,
1227+ } ) ;
1228+
1229+ // Set up all the dates we'll use
1230+ const now = new Date ( ) ;
1231+ const createdAt = new Date ( now . getTime ( ) - 10000 ) ;
1232+ const updatedAt = new Date ( now . getTime ( ) - 5000 ) ;
1233+ const startedAt = new Date ( now . getTime ( ) - 8000 ) ;
1234+ const executedAt = new Date ( now . getTime ( ) - 7500 ) ;
1235+ const completedAt = new Date ( now . getTime ( ) - 6000 ) ;
1236+ const delayUntil = new Date ( now . getTime ( ) - 9000 ) ;
1237+ const queuedAt = new Date ( now . getTime ( ) - 9500 ) ;
1238+ const expiredAt = null ; // Not expired
1239+
1240+ // Create the main task run with ALL fields populated
1241+ const taskRun = await prisma . taskRun . create ( {
1242+ data : {
1243+ // Core identifiers
1244+ friendlyId : "run_exhaustive_test" ,
1245+ taskIdentifier : "exhaustive-task" ,
1246+
1247+ // Environment/project/org
1248+ runtimeEnvironmentId : runtimeEnvironment . id ,
1249+ projectId : project . id ,
1250+ organizationId : organization . id ,
1251+ environmentType : "PRODUCTION" ,
1252+
1253+ // Engine and execution
1254+ engine : "V2" ,
1255+ status : "COMPLETED_SUCCESSFULLY" ,
1256+ attemptNumber : 3 ,
1257+ queue : "exhaustive-queue" ,
1258+ workerQueue : "exhaustive-worker-queue" ,
1259+
1260+ // Relationships
1261+ // Note: scheduleId is not set to test empty string handling
1262+ batchId : batch . id ,
1263+ rootTaskRunId : rootRun . id ,
1264+ parentTaskRunId : parentRun . id ,
1265+ depth : 2 ,
1266+
1267+ // Timestamps
1268+ createdAt,
1269+ updatedAt,
1270+ startedAt,
1271+ executedAt,
1272+ completedAt,
1273+ delayUntil,
1274+ queuedAt,
1275+ expiredAt,
1276+
1277+ // Payload and output
1278+ payload : JSON . stringify ( { input : "test-payload" } ) ,
1279+ payloadType : "application/json" ,
1280+ output : JSON . stringify ( { result : "test-output" } ) ,
1281+ outputType : "application/json" ,
1282+ error : { message : "test error" , name : "TestError" } ,
1283+
1284+ // Tracing
1285+ traceId : "exhaustive-trace-id-12345" ,
1286+ spanId : "exhaustive-span-id-67890" ,
1287+
1288+ // Versioning
1289+ taskVersion : "1.2.3" ,
1290+ sdkVersion : "3.0.0" ,
1291+ cliVersion : "2.5.1" ,
1292+
1293+ // Execution settings
1294+ machinePreset : "large-1x" ,
1295+ idempotencyKey : "exhaustive-idempotency-key" ,
1296+ ttl : "1h" ,
1297+ isTest : true ,
1298+ concurrencyKey : "exhaustive-concurrency-key" ,
1299+ maxDurationInSeconds : 3600 ,
1300+
1301+ // Tags and bulk actions
1302+ runTags : [ "tag1" , "tag2" , "exhaustive-tag" ] ,
1303+ bulkActionGroupIds : [ "bulk-group-1" , "bulk-group-2" ] ,
1304+
1305+ // Usage metrics
1306+ usageDurationMs : 12345 ,
1307+ costInCents : 50 ,
1308+ baseCostInCents : 25 ,
1309+ } ,
1310+ } ) ;
1311+
1312+ // Wait for replication
1313+ await setTimeout ( 1500 ) ;
1314+
1315+ // Query ClickHouse directly to get all columns
1316+ const queryRuns = clickhouse . reader . query ( {
1317+ name : "exhaustive-replication-test" ,
1318+ query : "SELECT * FROM trigger_dev.task_runs_v2 FINAL WHERE run_id = {run_id:String}" ,
1319+ schema : z . any ( ) ,
1320+ params : z . object ( { run_id : z . string ( ) } ) ,
1321+ } ) ;
1322+
1323+ const [ queryError , result ] = await queryRuns ( { run_id : taskRun . id } ) ;
1324+
1325+ expect ( queryError ) . toBeNull ( ) ;
1326+ expect ( result ) . toHaveLength ( 1 ) ;
1327+
1328+ const clickhouseRun = result ! [ 0 ] ;
1329+
1330+ // Exhaustively verify each column
1331+ // Core identifiers
1332+ expect ( clickhouseRun . run_id ) . toBe ( taskRun . id ) ;
1333+ expect ( clickhouseRun . friendly_id ) . toBe ( "run_exhaustive_test" ) ;
1334+ expect ( clickhouseRun . task_identifier ) . toBe ( "exhaustive-task" ) ;
1335+
1336+ // Environment/project/org
1337+ expect ( clickhouseRun . environment_id ) . toBe ( runtimeEnvironment . id ) ;
1338+ expect ( clickhouseRun . project_id ) . toBe ( project . id ) ;
1339+ expect ( clickhouseRun . organization_id ) . toBe ( organization . id ) ;
1340+ expect ( clickhouseRun . environment_type ) . toBe ( "PRODUCTION" ) ;
1341+
1342+ // Engine and execution
1343+ expect ( clickhouseRun . engine ) . toBe ( "V2" ) ;
1344+ expect ( clickhouseRun . status ) . toBe ( "COMPLETED_SUCCESSFULLY" ) ;
1345+ expect ( clickhouseRun . attempt ) . toBe ( 3 ) ;
1346+ expect ( clickhouseRun . queue ) . toBe ( "exhaustive-queue" ) ;
1347+ expect ( clickhouseRun . worker_queue ) . toBe ( "exhaustive-worker-queue" ) ;
1348+
1349+ // Relationships
1350+ expect ( clickhouseRun . schedule_id ) . toBe ( "" ) ; // Empty when not set
1351+ expect ( clickhouseRun . batch_id ) . toBe ( batch . id ) ;
1352+ expect ( clickhouseRun . root_run_id ) . toBe ( rootRun . id ) ;
1353+ expect ( clickhouseRun . parent_run_id ) . toBe ( parentRun . id ) ;
1354+ expect ( clickhouseRun . depth ) . toBe ( 2 ) ;
1355+
1356+ // Timestamps (ClickHouse returns DateTime64 as strings in UTC without 'Z' suffix)
1357+ // Helper to parse ClickHouse timestamp strings to milliseconds
1358+ function parseClickhouseTimestamp ( ts : string | null ) : number | null {
1359+ if ( ts === null || ts === "1970-01-01 00:00:00.000" ) return null ;
1360+ return new Date ( ts + "Z" ) . getTime ( ) ;
1361+ }
1362+
1363+ expect ( parseClickhouseTimestamp ( clickhouseRun . created_at ) ) . toBe ( createdAt . getTime ( ) ) ;
1364+ expect ( parseClickhouseTimestamp ( clickhouseRun . updated_at ) ) . toBe ( updatedAt . getTime ( ) ) ;
1365+ expect ( parseClickhouseTimestamp ( clickhouseRun . started_at ) ) . toBe ( startedAt . getTime ( ) ) ;
1366+ expect ( parseClickhouseTimestamp ( clickhouseRun . executed_at ) ) . toBe ( executedAt . getTime ( ) ) ;
1367+ expect ( parseClickhouseTimestamp ( clickhouseRun . completed_at ) ) . toBe ( completedAt . getTime ( ) ) ;
1368+ expect ( parseClickhouseTimestamp ( clickhouseRun . delay_until ) ) . toBe ( delayUntil . getTime ( ) ) ;
1369+ expect ( parseClickhouseTimestamp ( clickhouseRun . queued_at ) ) . toBe ( queuedAt . getTime ( ) ) ;
1370+ expect ( parseClickhouseTimestamp ( clickhouseRun . expired_at ) ) . toBeNull ( ) ;
1371+
1372+ // Output (parsed JSON)
1373+ expect ( clickhouseRun . output ) . toEqual ( { data : { result : "test-output" } } ) ;
1374+
1375+ // Error
1376+ expect ( clickhouseRun . error ) . toEqual ( {
1377+ data : { message : "test error" , name : "TestError" } ,
1378+ } ) ;
1379+
1380+ // Tracing
1381+ expect ( clickhouseRun . trace_id ) . toBe ( "exhaustive-trace-id-12345" ) ;
1382+ expect ( clickhouseRun . span_id ) . toBe ( "exhaustive-span-id-67890" ) ;
1383+
1384+ // Versioning
1385+ expect ( clickhouseRun . task_version ) . toBe ( "1.2.3" ) ;
1386+ expect ( clickhouseRun . sdk_version ) . toBe ( "3.0.0" ) ;
1387+ expect ( clickhouseRun . cli_version ) . toBe ( "2.5.1" ) ;
1388+
1389+ // Execution settings
1390+ expect ( clickhouseRun . machine_preset ) . toBe ( "large-1x" ) ;
1391+ expect ( clickhouseRun . idempotency_key ) . toBe ( "exhaustive-idempotency-key" ) ;
1392+ expect ( clickhouseRun . expiration_ttl ) . toBe ( "1h" ) ;
1393+ expect ( clickhouseRun . is_test ) . toBe ( 1 ) ; // ClickHouse returns booleans as integers
1394+ expect ( clickhouseRun . concurrency_key ) . toBe ( "exhaustive-concurrency-key" ) ;
1395+ expect ( clickhouseRun . max_duration_in_seconds ) . toBe ( 3600 ) ;
1396+
1397+ // Tags and bulk actions
1398+ expect ( clickhouseRun . tags ) . toEqual ( [ "tag1" , "tag2" , "exhaustive-tag" ] ) ;
1399+ expect ( clickhouseRun . bulk_action_group_ids ) . toEqual ( [ "bulk-group-1" , "bulk-group-2" ] ) ;
1400+
1401+ // Usage metrics
1402+ expect ( clickhouseRun . usage_duration_ms ) . toBe ( 12345 ) ;
1403+ expect ( clickhouseRun . cost_in_cents ) . toBe ( 50 ) ;
1404+ expect ( clickhouseRun . base_cost_in_cents ) . toBe ( 25 ) ;
1405+
1406+ // Internal ClickHouse columns
1407+ expect ( clickhouseRun . _is_deleted ) . toBe ( 0 ) ;
1408+ expect ( clickhouseRun . _version ) . toBeDefined ( ) ;
1409+ expect ( typeof clickhouseRun . _version ) . toBe ( "number" ) ; // ClickHouse returns UInt64 as number
1410+
1411+ // Also verify the payload was inserted into the payloads table
1412+ const queryPayloads = clickhouse . reader . query ( {
1413+ name : "exhaustive-payload-test" ,
1414+ query : "SELECT * FROM trigger_dev.raw_task_runs_payload_v1 WHERE run_id = {run_id:String}" ,
1415+ schema : z . any ( ) ,
1416+ params : z . object ( { run_id : z . string ( ) } ) ,
1417+ } ) ;
1418+
1419+ const [ payloadError , payloadResult ] = await queryPayloads ( { run_id : taskRun . id } ) ;
1420+
1421+ expect ( payloadError ) . toBeNull ( ) ;
1422+ expect ( payloadResult ) . toHaveLength ( 1 ) ;
1423+ expect ( payloadResult ! [ 0 ] . run_id ) . toBe ( taskRun . id ) ;
1424+ expect ( parseClickhouseTimestamp ( payloadResult ! [ 0 ] . created_at ) ) . toBe ( createdAt . getTime ( ) ) ;
1425+ expect ( payloadResult ! [ 0 ] . payload ) . toEqual ( { data : { input : "test-payload" } } ) ;
1426+
1427+ await runsReplicationService . stop ( ) ;
1428+ }
1429+ ) ;
11261430} ) ;
0 commit comments