From 8159c6b0a732d8ade8c87b69162e6526ef30357b Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Mon, 5 Jan 2026 15:54:38 +0100 Subject: [PATCH 01/10] feat: add infra for segments metrics --- ...p_segment_metrics_agg_states_ds.datasource | 19 +++++ ..._dashboard_metrics_project_group_sink.pipe | 42 ++++++++++ .../cdp_dashboard_metrics_project_sink.pipe | 43 ++++++++++ ...cdp_dashboard_metrics_subproject_sink.pipe | 51 ++++++++++++ ..._segment_metrics_agg_states_copy_pipe.pipe | 80 +++++++++++++++++++ 5 files changed, 235 insertions(+) create mode 100644 services/libs/tinybird/datasources/cdp_segment_metrics_agg_states_ds.datasource create mode 100644 services/libs/tinybird/pipes/cdp_dashboard_metrics_project_group_sink.pipe create mode 100644 services/libs/tinybird/pipes/cdp_dashboard_metrics_project_sink.pipe create mode 100644 services/libs/tinybird/pipes/cdp_dashboard_metrics_subproject_sink.pipe create mode 100644 services/libs/tinybird/pipes/cdp_segment_metrics_agg_states_copy_pipe.pipe diff --git a/services/libs/tinybird/datasources/cdp_segment_metrics_agg_states_ds.datasource b/services/libs/tinybird/datasources/cdp_segment_metrics_agg_states_ds.datasource new file mode 100644 index 0000000000..0ab1b41c8a --- /dev/null +++ b/services/libs/tinybird/datasources/cdp_segment_metrics_agg_states_ds.datasource @@ -0,0 +1,19 @@ +DESCRIPTION > + Segment-level aggregate states (one row per segmentId per snapshot). + Stores AggregateFunction states for DISTINCT rollups (uniqCombinedState). + +SCHEMA > + `snapshotDate` Date, + `segmentId` String, + `parentId` String, + `grandparentId` String, + `activitiesTotalState` AggregateFunction(sum, UInt64), + `activitiesLast30DaysState` AggregateFunction(sum, UInt64), + `membersUniqState` AggregateFunction(uniqCombined, String), + `membersLast30UniqState` AggregateFunction(uniqCombined, String), + `orgsUniqState` AggregateFunction(uniqCombined, String), + `orgsLast30UniqState` AggregateFunction(uniqCombined, String) + +ENGINE AggregatingMergeTree +ENGINE_PARTITION_KEY snapshotDate +ENGINE_SORTING_KEY (snapshotDate, segmentId) diff --git a/services/libs/tinybird/pipes/cdp_dashboard_metrics_project_group_sink.pipe b/services/libs/tinybird/pipes/cdp_dashboard_metrics_project_group_sink.pipe new file mode 100644 index 0000000000..6826caeb35 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_dashboard_metrics_project_group_sink.pipe @@ -0,0 +1,42 @@ +DESCRIPTION > + CDP dashboard metrics per segment - PROJECT GROUP level. + Rolls up subproject aggregate states via grandparentId. + +NODE projectGroupMetrics +SQL > + SELECT + s.grandparentId AS segmentId, + 'projectGroup' AS segmentType, + s.grandparentId AS parentId, + s.grandparentId AS grandparentId, + s.grandparentSlug AS segmentSlug, + s.grandparentSlug AS parentSlug, + s.grandparentSlug AS grandparentSlug, + s.grandparentName AS segmentName, + s.grandparentName AS parentName, + s.grandparentName AS grandparentName, + sumMerge(sa.activitiesTotalState) AS activitiesTotal, + sumMerge(sa.activitiesLast30DaysState) AS activitiesLast30Days, + uniqCombinedMerge(sa.membersUniqState) AS membersTotal, + uniqCombinedMerge(sa.membersLast30UniqState) AS membersLast30Days, + uniqCombinedMerge(sa.orgsUniqState) AS organizationsTotal, + uniqCombinedMerge(sa.orgsLast30UniqState) AS organizationsLast30Days + FROM segments AS s + LEFT JOIN + cdp_segment_metrics_agg_states_ds AS sa ON sa.segmentId = s.id AND sa.snapshotDate = today() + WHERE + s.parentSlug IS NOT NULL + AND s.grandparentSlug IS NOT NULL + AND s.parentId IS NOT NULL + AND s.grandparentId IS NOT NULL + AND s.parentId != '' + AND s.grandparentId != '' + GROUP BY s.grandparentId, s.grandparentSlug, s.grandparentName + +TYPE SINK +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_SCHEDULE 10 9 * * * +EXPORT_FORMAT json +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_per_segment_sink diff --git a/services/libs/tinybird/pipes/cdp_dashboard_metrics_project_sink.pipe b/services/libs/tinybird/pipes/cdp_dashboard_metrics_project_sink.pipe new file mode 100644 index 0000000000..67afed28f4 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_dashboard_metrics_project_sink.pipe @@ -0,0 +1,43 @@ +DESCRIPTION > + CDP dashboard metrics per segment - PROJECT level. + Rolls up subproject aggregate states via parentId. + +NODE projectMetrics +SQL > + SELECT + s.parentId AS segmentId, + 'project' AS segmentType, + s.parentId AS parentId, + s.grandparentId AS grandparentId, + s.parentSlug AS segmentSlug, + s.parentSlug AS parentSlug, + s.grandparentSlug AS grandparentSlug, + s.parentName AS segmentName, + s.parentName AS parentName, + s.grandparentName AS grandparentName, + sumMerge(sa.activitiesTotalState) AS activitiesTotal, + sumMerge(sa.activitiesLast30DaysState) AS activitiesLast30Days, + uniqCombinedMerge(sa.membersUniqState) AS membersTotal, + uniqCombinedMerge(sa.membersLast30UniqState) AS membersLast30Days, + uniqCombinedMerge(sa.orgsUniqState) AS organizationsTotal, + uniqCombinedMerge(sa.orgsLast30UniqState) AS organizationsLast30Days + FROM segments AS s + LEFT JOIN + cdp_segment_metrics_agg_states_ds AS sa ON sa.segmentId = s.id AND sa.snapshotDate = today() + WHERE + s.parentSlug IS NOT NULL + AND s.grandparentSlug IS NOT NULL + AND s.parentId IS NOT NULL + AND s.grandparentId IS NOT NULL + AND s.parentId != '' + AND s.grandparentId != '' + GROUP BY + s.parentId, s.grandparentId, s.parentSlug, s.grandparentSlug, s.parentName, s.grandparentName + +TYPE SINK +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_SCHEDULE 5 9 * * * +EXPORT_FORMAT json +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_per_segment_sink diff --git a/services/libs/tinybird/pipes/cdp_dashboard_metrics_subproject_sink.pipe b/services/libs/tinybird/pipes/cdp_dashboard_metrics_subproject_sink.pipe new file mode 100644 index 0000000000..770c9583ef --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_dashboard_metrics_subproject_sink.pipe @@ -0,0 +1,51 @@ +DESCRIPTION > + CDP dashboard metrics per segment - SUBPROJECT level. + Finalizes segment-level aggregate states and exports them. + +NODE subprojectMetrics +SQL > + SELECT + s.id AS segmentId, + 'subproject' AS segmentType, + s.parentId AS parentId, + s.grandparentId AS grandparentId, + s.slug AS segmentSlug, + s.parentSlug AS parentSlug, + s.grandparentSlug AS grandparentSlug, + s.name AS segmentName, + s.parentName AS parentName, + s.grandparentName AS grandparentName, + sumMerge(sa.activitiesTotalState) AS activitiesTotal, + sumMerge(sa.activitiesLast30DaysState) AS activitiesLast30Days, + uniqCombinedMerge(sa.membersUniqState) AS membersTotal, + uniqCombinedMerge(sa.membersLast30UniqState) AS membersLast30Days, + uniqCombinedMerge(sa.orgsUniqState) AS organizationsTotal, + uniqCombinedMerge(sa.orgsLast30UniqState) AS organizationsLast30Days + FROM segments AS s + LEFT JOIN + cdp_segment_metrics_agg_states_ds AS sa ON sa.segmentId = s.id AND sa.snapshotDate = today() + WHERE + s.parentSlug IS NOT NULL + AND s.grandparentSlug IS NOT NULL + AND s.parentId IS NOT NULL + AND s.grandparentId IS NOT NULL + AND s.parentId != '' + AND s.grandparentId != '' + GROUP BY + s.id, + s.parentId, + s.grandparentId, + s.slug, + s.parentSlug, + s.grandparentSlug, + s.name, + s.parentName, + s.grandparentName + +TYPE SINK +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_SCHEDULE 0 9 * * * +EXPORT_FORMAT json +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_per_segment_sink diff --git a/services/libs/tinybird/pipes/cdp_segment_metrics_agg_states_copy_pipe.pipe b/services/libs/tinybird/pipes/cdp_segment_metrics_agg_states_copy_pipe.pipe new file mode 100644 index 0000000000..14fd601240 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_segment_metrics_agg_states_copy_pipe.pipe @@ -0,0 +1,80 @@ +DESCRIPTION > + Daily batch job that builds segment-level aggregate states from existing + member and organization segment aggregate datasources. + Heavy DISTINCT logic lives here. + +NODE memberPerSegment +SQL > + -- Finalize per (segmentId, memberId) + SELECT + segmentId, + memberId, + countMerge(activityCountState) AS activityCount, + maxMerge(lastActiveState) AS lastActive + FROM cdp_member_segment_aggregates_ds + GROUP BY segmentId, memberId + +NODE memberSegmentStates +SQL > + -- Build segment-level activity + member DISTINCT states + SELECT + segmentId, + sumState(activityCount) AS activitiesTotalState, + sumStateIf(activityCount, lastActive >= now() - INTERVAL 30 DAY) AS activitiesLast30DaysState, + uniqCombinedState(memberId) AS membersUniqState, + uniqCombinedStateIf(memberId, lastActive >= now() - INTERVAL 30 DAY) AS membersLast30UniqState + FROM memberPerSegment + GROUP BY segmentId + +NODE orgPerSegment +SQL > + -- Finalize per (segmentId, organizationId) + SELECT segmentId, organizationId, maxMerge(lastActiveState) AS lastActive + FROM cdp_organization_segment_aggregates_ds + GROUP BY segmentId, organizationId + +NODE orgSegmentStates +SQL > + -- Build segment-level organization DISTINCT states + SELECT + segmentId, + uniqCombinedState(organizationId) AS orgsUniqState, + uniqCombinedStateIf( + organizationId, lastActive >= now() - INTERVAL 30 DAY + ) AS orgsLast30UniqState + FROM orgPerSegment + GROUP BY segmentId + +NODE segmentAggStates +SQL > + -- Attach hierarchy and snapshot date + SELECT + toDate(now()) AS snapshotDate, + s.id AS segmentId, + s.parentId AS parentId, + s.grandparentId AS grandparentId, + ms.activitiesTotalState, + ms.activitiesLast30DaysState, + ms.membersUniqState, + ms.membersLast30UniqState, + os.orgsUniqState, + os.orgsLast30UniqState + FROM + ( + SELECT id, parentId, grandparentId + FROM segments + WHERE + parentSlug IS NOT NULL + AND grandparentSlug IS NOT NULL + AND parentId IS NOT NULL + AND grandparentId IS NOT NULL + AND parentId != '' + AND grandparentId != '' + ) AS s + LEFT JOIN memberSegmentStates AS ms ON ms.segmentId = s.id + LEFT JOIN orgSegmentStates AS os ON os.segmentId = s.id + +TYPE COPY +TARGET_DATASOURCE cdp_segment_metrics_agg_states_ds +COPY_MODE append +COPY_SCHEDULE 0 9 * * * From b7a92e79dee2043a833e70723843df3e0efb2c0a Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Wed, 7 Jan 2026 15:54:59 +0100 Subject: [PATCH 02/10] fix: review comments --- ...urce => cdp_segment_metrics_ds.datasource} | 0 ...ipe => cdp_segment_metrics_copy_pipe.pipe} | 46 +++++++++++++------ ...p_segment_metrics_project_group_sink.pipe} | 6 ++- ... => cdp_segment_metrics_project_sink.pipe} | 6 ++- ... cdp_segment_metrics_subproject_sink.pipe} | 6 ++- 5 files changed, 45 insertions(+), 19 deletions(-) rename services/libs/tinybird/datasources/{cdp_segment_metrics_agg_states_ds.datasource => cdp_segment_metrics_ds.datasource} (100%) rename services/libs/tinybird/pipes/{cdp_segment_metrics_agg_states_copy_pipe.pipe => cdp_segment_metrics_copy_pipe.pipe} (56%) rename services/libs/tinybird/pipes/{cdp_dashboard_metrics_project_group_sink.pipe => cdp_segment_metrics_project_group_sink.pipe} (89%) rename services/libs/tinybird/pipes/{cdp_dashboard_metrics_project_sink.pipe => cdp_segment_metrics_project_sink.pipe} (89%) rename services/libs/tinybird/pipes/{cdp_dashboard_metrics_subproject_sink.pipe => cdp_segment_metrics_subproject_sink.pipe} (89%) diff --git a/services/libs/tinybird/datasources/cdp_segment_metrics_agg_states_ds.datasource b/services/libs/tinybird/datasources/cdp_segment_metrics_ds.datasource similarity index 100% rename from services/libs/tinybird/datasources/cdp_segment_metrics_agg_states_ds.datasource rename to services/libs/tinybird/datasources/cdp_segment_metrics_ds.datasource diff --git a/services/libs/tinybird/pipes/cdp_segment_metrics_agg_states_copy_pipe.pipe b/services/libs/tinybird/pipes/cdp_segment_metrics_copy_pipe.pipe similarity index 56% rename from services/libs/tinybird/pipes/cdp_segment_metrics_agg_states_copy_pipe.pipe rename to services/libs/tinybird/pipes/cdp_segment_metrics_copy_pipe.pipe index 14fd601240..a146b54bc2 100644 --- a/services/libs/tinybird/pipes/cdp_segment_metrics_agg_states_copy_pipe.pipe +++ b/services/libs/tinybird/pipes/cdp_segment_metrics_copy_pipe.pipe @@ -3,13 +3,23 @@ DESCRIPTION > member and organization segment aggregate datasources. Heavy DISTINCT logic lives here. +NODE activitiesSegmentStates +SQL > + -- Compute activity counters from event-level data (correct last-30-days behavior) + SELECT + segmentId, + countState() AS activitiesTotalState, + countStateIf(timestamp >= now() - INTERVAL 30 DAY) AS activitiesLast30DaysState + FROM activityRelations_enriched_deduplicated_ds + WHERE segmentId IS NOT NULL AND segmentId != '' + GROUP BY segmentId + NODE memberPerSegment SQL > -- Finalize per (segmentId, memberId) SELECT segmentId, memberId, - countMerge(activityCountState) AS activityCount, maxMerge(lastActiveState) AS lastActive FROM cdp_member_segment_aggregates_ds GROUP BY segmentId, memberId @@ -19,8 +29,6 @@ SQL > -- Build segment-level activity + member DISTINCT states SELECT segmentId, - sumState(activityCount) AS activitiesTotalState, - sumStateIf(activityCount, lastActive >= now() - INTERVAL 30 DAY) AS activitiesLast30DaysState, uniqCombinedState(memberId) AS membersUniqState, uniqCombinedStateIf(memberId, lastActive >= now() - INTERVAL 30 DAY) AS membersLast30UniqState FROM memberPerSegment @@ -47,18 +55,25 @@ SQL > NODE segmentAggStates SQL > - -- Attach hierarchy and snapshot date + -- Attach hierarchy and snapshot date. + -- Coalesce NULL AggregateFunction states to empty states for segments without activity. SELECT toDate(now()) AS snapshotDate, s.id AS segmentId, s.parentId AS parentId, s.grandparentId AS grandparentId, - ms.activitiesTotalState, - ms.activitiesLast30DaysState, - ms.membersUniqState, - ms.membersLast30UniqState, - os.orgsUniqState, - os.orgsLast30UniqState + + -- Activities (event-level truth) + ifNull(a.activitiesTotalState, countStateIf(0)) AS activitiesTotalState, + ifNull(a.activitiesLast30DaysState, countStateIf(0)) AS activitiesLast30DaysState, + + -- Members DISTINCT states + ifNull(ms.membersUniqState, uniqCombinedStateIf('x', 0)) AS membersUniqState, + ifNull(ms.membersLast30UniqState, uniqCombinedStateIf('x', 0)) AS membersLast30UniqState, + + -- Organizations DISTINCT states + ifNull(os.orgsUniqState, uniqCombinedStateIf('x', 0)) AS orgsUniqState, + ifNull(os.orgsLast30UniqState, uniqCombinedStateIf('x', 0)) AS orgsLast30UniqState FROM ( SELECT id, parentId, grandparentId @@ -71,10 +86,15 @@ SQL > AND parentId != '' AND grandparentId != '' ) AS s - LEFT JOIN memberSegmentStates AS ms ON ms.segmentId = s.id - LEFT JOIN orgSegmentStates AS os ON os.segmentId = s.id + LEFT JOIN activitiesSegmentStates AS a + ON a.segmentId = s.id + LEFT JOIN memberSegmentStates AS ms + ON ms.segmentId = s.id + LEFT JOIN orgSegmentStates AS os + ON os.segmentId = s.id + TYPE COPY -TARGET_DATASOURCE cdp_segment_metrics_agg_states_ds +TARGET_DATASOURCE cdp_segment_metrics_ds COPY_MODE append COPY_SCHEDULE 0 9 * * * diff --git a/services/libs/tinybird/pipes/cdp_dashboard_metrics_project_group_sink.pipe b/services/libs/tinybird/pipes/cdp_segment_metrics_project_group_sink.pipe similarity index 89% rename from services/libs/tinybird/pipes/cdp_dashboard_metrics_project_group_sink.pipe rename to services/libs/tinybird/pipes/cdp_segment_metrics_project_group_sink.pipe index 6826caeb35..814980c918 100644 --- a/services/libs/tinybird/pipes/cdp_dashboard_metrics_project_group_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_segment_metrics_project_group_sink.pipe @@ -23,7 +23,9 @@ SQL > uniqCombinedMerge(sa.orgsLast30UniqState) AS organizationsLast30Days FROM segments AS s LEFT JOIN - cdp_segment_metrics_agg_states_ds AS sa ON sa.segmentId = s.id AND sa.snapshotDate = today() + cdp_segment_metrics_ds AS sa + ON sa.segmentId = s.id + AND sa.snapshotDate = (SELECT max(snapshotDate) FROM cdp_segment_metrics_ds) WHERE s.parentSlug IS NOT NULL AND s.grandparentSlug IS NOT NULL @@ -36,7 +38,7 @@ SQL > TYPE SINK EXPORT_SERVICE kafka EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming -EXPORT_SCHEDULE 10 9 * * * +EXPORT_SCHEDULE 35 9 * * * EXPORT_FORMAT json EXPORT_STRATEGY @new EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_per_segment_sink diff --git a/services/libs/tinybird/pipes/cdp_dashboard_metrics_project_sink.pipe b/services/libs/tinybird/pipes/cdp_segment_metrics_project_sink.pipe similarity index 89% rename from services/libs/tinybird/pipes/cdp_dashboard_metrics_project_sink.pipe rename to services/libs/tinybird/pipes/cdp_segment_metrics_project_sink.pipe index 67afed28f4..b5c311a9fc 100644 --- a/services/libs/tinybird/pipes/cdp_dashboard_metrics_project_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_segment_metrics_project_sink.pipe @@ -23,7 +23,9 @@ SQL > uniqCombinedMerge(sa.orgsLast30UniqState) AS organizationsLast30Days FROM segments AS s LEFT JOIN - cdp_segment_metrics_agg_states_ds AS sa ON sa.segmentId = s.id AND sa.snapshotDate = today() + cdp_segment_metrics_ds AS sa + ON sa.segmentId = s.id + AND sa.snapshotDate = (SELECT max(snapshotDate) FROM cdp_segment_metrics_ds) WHERE s.parentSlug IS NOT NULL AND s.grandparentSlug IS NOT NULL @@ -37,7 +39,7 @@ SQL > TYPE SINK EXPORT_SERVICE kafka EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming -EXPORT_SCHEDULE 5 9 * * * +EXPORT_SCHEDULE 40 9 * * * EXPORT_FORMAT json EXPORT_STRATEGY @new EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_per_segment_sink diff --git a/services/libs/tinybird/pipes/cdp_dashboard_metrics_subproject_sink.pipe b/services/libs/tinybird/pipes/cdp_segment_metrics_subproject_sink.pipe similarity index 89% rename from services/libs/tinybird/pipes/cdp_dashboard_metrics_subproject_sink.pipe rename to services/libs/tinybird/pipes/cdp_segment_metrics_subproject_sink.pipe index 770c9583ef..a9c62012dc 100644 --- a/services/libs/tinybird/pipes/cdp_dashboard_metrics_subproject_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_segment_metrics_subproject_sink.pipe @@ -23,7 +23,9 @@ SQL > uniqCombinedMerge(sa.orgsLast30UniqState) AS organizationsLast30Days FROM segments AS s LEFT JOIN - cdp_segment_metrics_agg_states_ds AS sa ON sa.segmentId = s.id AND sa.snapshotDate = today() + cdp_segment_metrics_ds AS sa + ON sa.segmentId = s.id + AND sa.snapshotDate = (SELECT max(snapshotDate) FROM cdp_segment_metrics_ds) WHERE s.parentSlug IS NOT NULL AND s.grandparentSlug IS NOT NULL @@ -45,7 +47,7 @@ SQL > TYPE SINK EXPORT_SERVICE kafka EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming -EXPORT_SCHEDULE 0 9 * * * +EXPORT_SCHEDULE 30 9 * * * EXPORT_FORMAT json EXPORT_STRATEGY @new EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_per_segment_sink From 19b5e31b56290515a9f86639de84a166b1497d09 Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Wed, 7 Jan 2026 15:57:03 +0100 Subject: [PATCH 03/10] fix: format --- .../pipes/cdp_segment_metrics_copy_pipe.pipe | 30 +++++++------------ 1 file changed, 10 insertions(+), 20 deletions(-) diff --git a/services/libs/tinybird/pipes/cdp_segment_metrics_copy_pipe.pipe b/services/libs/tinybird/pipes/cdp_segment_metrics_copy_pipe.pipe index a146b54bc2..c17576a9f8 100644 --- a/services/libs/tinybird/pipes/cdp_segment_metrics_copy_pipe.pipe +++ b/services/libs/tinybird/pipes/cdp_segment_metrics_copy_pipe.pipe @@ -17,10 +17,7 @@ SQL > NODE memberPerSegment SQL > -- Finalize per (segmentId, memberId) - SELECT - segmentId, - memberId, - maxMerge(lastActiveState) AS lastActive + SELECT segmentId, memberId, maxMerge(lastActiveState) AS lastActive FROM cdp_member_segment_aggregates_ds GROUP BY segmentId, memberId @@ -62,18 +59,15 @@ SQL > s.id AS segmentId, s.parentId AS parentId, s.grandparentId AS grandparentId, - -- Activities (event-level truth) - ifNull(a.activitiesTotalState, countStateIf(0)) AS activitiesTotalState, - ifNull(a.activitiesLast30DaysState, countStateIf(0)) AS activitiesLast30DaysState, - + ifNull(a.activitiesTotalState, countStateIf(0)) AS activitiesTotalState, + ifNull(a.activitiesLast30DaysState, countStateIf(0)) AS activitiesLast30DaysState, -- Members DISTINCT states - ifNull(ms.membersUniqState, uniqCombinedStateIf('x', 0)) AS membersUniqState, - ifNull(ms.membersLast30UniqState, uniqCombinedStateIf('x', 0)) AS membersLast30UniqState, - + ifNull(ms.membersUniqState, uniqCombinedStateIf('x', 0)) AS membersUniqState, + ifNull(ms.membersLast30UniqState, uniqCombinedStateIf('x', 0)) AS membersLast30UniqState, -- Organizations DISTINCT states - ifNull(os.orgsUniqState, uniqCombinedStateIf('x', 0)) AS orgsUniqState, - ifNull(os.orgsLast30UniqState, uniqCombinedStateIf('x', 0)) AS orgsLast30UniqState + ifNull(os.orgsUniqState, uniqCombinedStateIf('x', 0)) AS orgsUniqState, + ifNull(os.orgsLast30UniqState, uniqCombinedStateIf('x', 0)) AS orgsLast30UniqState FROM ( SELECT id, parentId, grandparentId @@ -86,13 +80,9 @@ SQL > AND parentId != '' AND grandparentId != '' ) AS s - LEFT JOIN activitiesSegmentStates AS a - ON a.segmentId = s.id - LEFT JOIN memberSegmentStates AS ms - ON ms.segmentId = s.id - LEFT JOIN orgSegmentStates AS os - ON os.segmentId = s.id - + LEFT JOIN activitiesSegmentStates AS a ON a.segmentId = s.id + LEFT JOIN memberSegmentStates AS ms ON ms.segmentId = s.id + LEFT JOIN orgSegmentStates AS os ON os.segmentId = s.id TYPE COPY TARGET_DATASOURCE cdp_segment_metrics_ds From 61dd2e2514bc32d0627f0f9bed84c843e4dd9660 Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Wed, 7 Jan 2026 16:18:23 +0100 Subject: [PATCH 04/10] fix: format --- .../tinybird/datasources/cdp_segment_metrics_ds.datasource | 4 ++-- .../libs/tinybird/pipes/cdp_segment_metrics_copy_pipe.pipe | 2 +- .../pipes/cdp_segment_metrics_project_group_sink.pipe | 4 ++-- .../libs/tinybird/pipes/cdp_segment_metrics_project_sink.pipe | 4 ++-- .../tinybird/pipes/cdp_segment_metrics_subproject_sink.pipe | 4 ++-- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/services/libs/tinybird/datasources/cdp_segment_metrics_ds.datasource b/services/libs/tinybird/datasources/cdp_segment_metrics_ds.datasource index 0ab1b41c8a..91ad4181e1 100644 --- a/services/libs/tinybird/datasources/cdp_segment_metrics_ds.datasource +++ b/services/libs/tinybird/datasources/cdp_segment_metrics_ds.datasource @@ -7,8 +7,8 @@ SCHEMA > `segmentId` String, `parentId` String, `grandparentId` String, - `activitiesTotalState` AggregateFunction(sum, UInt64), - `activitiesLast30DaysState` AggregateFunction(sum, UInt64), + `activitiesTotalState` AggregateFunction(count), + `activitiesLast30DaysState` AggregateFunction(count), `membersUniqState` AggregateFunction(uniqCombined, String), `membersLast30UniqState` AggregateFunction(uniqCombined, String), `orgsUniqState` AggregateFunction(uniqCombined, String), diff --git a/services/libs/tinybird/pipes/cdp_segment_metrics_copy_pipe.pipe b/services/libs/tinybird/pipes/cdp_segment_metrics_copy_pipe.pipe index c17576a9f8..c69c15cb53 100644 --- a/services/libs/tinybird/pipes/cdp_segment_metrics_copy_pipe.pipe +++ b/services/libs/tinybird/pipes/cdp_segment_metrics_copy_pipe.pipe @@ -86,5 +86,5 @@ SQL > TYPE COPY TARGET_DATASOURCE cdp_segment_metrics_ds -COPY_MODE append +COPY_MODE replace COPY_SCHEDULE 0 9 * * * diff --git a/services/libs/tinybird/pipes/cdp_segment_metrics_project_group_sink.pipe b/services/libs/tinybird/pipes/cdp_segment_metrics_project_group_sink.pipe index 814980c918..1d3ea0abd5 100644 --- a/services/libs/tinybird/pipes/cdp_segment_metrics_project_group_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_segment_metrics_project_group_sink.pipe @@ -15,8 +15,8 @@ SQL > s.grandparentName AS segmentName, s.grandparentName AS parentName, s.grandparentName AS grandparentName, - sumMerge(sa.activitiesTotalState) AS activitiesTotal, - sumMerge(sa.activitiesLast30DaysState) AS activitiesLast30Days, + countMerge(sa.activitiesTotalState) AS activitiesTotal, + countMerge(sa.activitiesLast30DaysState) AS activitiesLast30Days, uniqCombinedMerge(sa.membersUniqState) AS membersTotal, uniqCombinedMerge(sa.membersLast30UniqState) AS membersLast30Days, uniqCombinedMerge(sa.orgsUniqState) AS organizationsTotal, diff --git a/services/libs/tinybird/pipes/cdp_segment_metrics_project_sink.pipe b/services/libs/tinybird/pipes/cdp_segment_metrics_project_sink.pipe index b5c311a9fc..dec98fc542 100644 --- a/services/libs/tinybird/pipes/cdp_segment_metrics_project_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_segment_metrics_project_sink.pipe @@ -15,8 +15,8 @@ SQL > s.parentName AS segmentName, s.parentName AS parentName, s.grandparentName AS grandparentName, - sumMerge(sa.activitiesTotalState) AS activitiesTotal, - sumMerge(sa.activitiesLast30DaysState) AS activitiesLast30Days, + countMerge(sa.activitiesTotalState) AS activitiesTotal, + countMerge(sa.activitiesLast30DaysState) AS activitiesLast30Days, uniqCombinedMerge(sa.membersUniqState) AS membersTotal, uniqCombinedMerge(sa.membersLast30UniqState) AS membersLast30Days, uniqCombinedMerge(sa.orgsUniqState) AS organizationsTotal, diff --git a/services/libs/tinybird/pipes/cdp_segment_metrics_subproject_sink.pipe b/services/libs/tinybird/pipes/cdp_segment_metrics_subproject_sink.pipe index a9c62012dc..4ffaabfc17 100644 --- a/services/libs/tinybird/pipes/cdp_segment_metrics_subproject_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_segment_metrics_subproject_sink.pipe @@ -15,8 +15,8 @@ SQL > s.name AS segmentName, s.parentName AS parentName, s.grandparentName AS grandparentName, - sumMerge(sa.activitiesTotalState) AS activitiesTotal, - sumMerge(sa.activitiesLast30DaysState) AS activitiesLast30Days, + countMerge(sa.activitiesTotalState) AS activitiesTotal, + countMerge(sa.activitiesLast30DaysState) AS activitiesLast30Days, uniqCombinedMerge(sa.membersUniqState) AS membersTotal, uniqCombinedMerge(sa.membersLast30UniqState) AS membersLast30Days, uniqCombinedMerge(sa.orgsUniqState) AS organizationsTotal, From 41ef1ee53dbfc917c421f01da416da1aa8ed2b45 Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Wed, 7 Jan 2026 17:37:13 +0100 Subject: [PATCH 05/10] fix: project slug anomaly --- .../tinybird/pipes/cdp_segment_metrics_project_sink.pipe | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/services/libs/tinybird/pipes/cdp_segment_metrics_project_sink.pipe b/services/libs/tinybird/pipes/cdp_segment_metrics_project_sink.pipe index dec98fc542..873cf42020 100644 --- a/services/libs/tinybird/pipes/cdp_segment_metrics_project_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_segment_metrics_project_sink.pipe @@ -7,13 +7,13 @@ SQL > SELECT s.parentId AS segmentId, 'project' AS segmentType, - s.parentId AS parentId, + s.grandparentId AS parentId, s.grandparentId AS grandparentId, s.parentSlug AS segmentSlug, - s.parentSlug AS parentSlug, + s.grandparentSlug AS parentSlug, s.grandparentSlug AS grandparentSlug, s.parentName AS segmentName, - s.parentName AS parentName, + s.grandparentName AS parentName, s.grandparentName AS grandparentName, countMerge(sa.activitiesTotalState) AS activitiesTotal, countMerge(sa.activitiesLast30DaysState) AS activitiesLast30Days, From ab05144e30bf3e0aef908ed0b5713e4bb8612550 Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Wed, 7 Jan 2026 17:51:16 +0100 Subject: [PATCH 06/10] fix: use snapshot in the activityRelations --- .../libs/tinybird/pipes/cdp_segment_metrics_copy_pipe.pipe | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/services/libs/tinybird/pipes/cdp_segment_metrics_copy_pipe.pipe b/services/libs/tinybird/pipes/cdp_segment_metrics_copy_pipe.pipe index c69c15cb53..fd9dc5c852 100644 --- a/services/libs/tinybird/pipes/cdp_segment_metrics_copy_pipe.pipe +++ b/services/libs/tinybird/pipes/cdp_segment_metrics_copy_pipe.pipe @@ -11,7 +11,10 @@ SQL > countState() AS activitiesTotalState, countStateIf(timestamp >= now() - INTERVAL 30 DAY) AS activitiesLast30DaysState FROM activityRelations_enriched_deduplicated_ds - WHERE segmentId IS NOT NULL AND segmentId != '' + WHERE + segmentId IS NOT NULL + AND segmentId != '' + AND snapshotId = (select max(snapshotId) from activityRelations_enriched_deduplicated_ds) GROUP BY segmentId NODE memberPerSegment From 4e416ec3af09299426e914b27ab492d375c53c1b Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Wed, 7 Jan 2026 18:24:04 +0100 Subject: [PATCH 07/10] fix: cursor suggestions --- .../pipes/cdp_segment_metrics_copy_pipe.pipe | 111 ++++++++++++------ 1 file changed, 76 insertions(+), 35 deletions(-) diff --git a/services/libs/tinybird/pipes/cdp_segment_metrics_copy_pipe.pipe b/services/libs/tinybird/pipes/cdp_segment_metrics_copy_pipe.pipe index fd9dc5c852..b324dc8896 100644 --- a/services/libs/tinybird/pipes/cdp_segment_metrics_copy_pipe.pipe +++ b/services/libs/tinybird/pipes/cdp_segment_metrics_copy_pipe.pipe @@ -1,20 +1,32 @@ DESCRIPTION > Daily batch job that builds segment-level aggregate states from existing member and organization segment aggregate datasources. - Heavy DISTINCT logic lives here. + Activities are computed from activityRelations_enriched_deduplicated_ds using the latest snapshotId. + This version avoids CTEs and avoids *StateIf() combinators to keep AggregateFunction types consistent. -NODE activitiesSegmentStates +NODE activitiesAllTimeStates SQL > - -- Compute activity counters from event-level data (correct last-30-days behavior) - SELECT - segmentId, - countState() AS activitiesTotalState, - countStateIf(timestamp >= now() - INTERVAL 30 DAY) AS activitiesLast30DaysState + -- Compute total activities from event-level data (latest snapshot only). + -- Avoid countStateIf() to keep AggregateFunction(count) type consistent. + SELECT segmentId, countState() AS activitiesTotalState + FROM activityRelations_enriched_deduplicated_ds + WHERE + segmentId IS NOT NULL + AND segmentId != '' + AND snapshotId = (SELECT max(snapshotId) FROM activityRelations_enriched_deduplicated_ds) + GROUP BY segmentId + +NODE activitiesLast30States +SQL > + -- Compute last-30-days activities from event-level data (latest snapshot only). + -- Use WHERE filtering instead of countStateIf() to avoid AggregateFunction(countIf) types. + SELECT segmentId, countState() AS activitiesLast30DaysState FROM activityRelations_enriched_deduplicated_ds WHERE segmentId IS NOT NULL AND segmentId != '' - AND snapshotId = (select max(snapshotId) from activityRelations_enriched_deduplicated_ds) + AND snapshotId = (SELECT max(snapshotId) FROM activityRelations_enriched_deduplicated_ds) + AND timestamp >= now() - INTERVAL 30 DAY GROUP BY segmentId NODE memberPerSegment @@ -24,14 +36,21 @@ SQL > FROM cdp_member_segment_aggregates_ds GROUP BY segmentId, memberId -NODE memberSegmentStates +NODE memberAllTimeStates SQL > - -- Build segment-level activity + member DISTINCT states - SELECT - segmentId, - uniqCombinedState(memberId) AS membersUniqState, - uniqCombinedStateIf(memberId, lastActive >= now() - INTERVAL 30 DAY) AS membersLast30UniqState + -- Build segment-level DISTINCT member states (all time). + -- Avoid uniqCombinedStateIf() to keep AggregateFunction(uniqCombined, String) type consistent. + SELECT segmentId, uniqCombinedState(memberId) AS membersUniqState + FROM memberPerSegment + GROUP BY segmentId + +NODE memberLast30States +SQL > + -- Build segment-level DISTINCT member states (last 30 days). + -- Use WHERE filtering instead of uniqCombinedStateIf() to avoid uniqCombinedIf types. + SELECT segmentId, uniqCombinedState(memberId) AS membersLast30UniqState FROM memberPerSegment + WHERE lastActive >= now() - INTERVAL 30 DAY GROUP BY segmentId NODE orgPerSegment @@ -41,36 +60,55 @@ SQL > FROM cdp_organization_segment_aggregates_ds GROUP BY segmentId, organizationId -NODE orgSegmentStates +NODE orgAllTimeStates SQL > - -- Build segment-level organization DISTINCT states - SELECT - segmentId, - uniqCombinedState(organizationId) AS orgsUniqState, - uniqCombinedStateIf( - organizationId, lastActive >= now() - INTERVAL 30 DAY - ) AS orgsLast30UniqState + -- Build segment-level DISTINCT organization states (all time). + SELECT segmentId, uniqCombinedState(organizationId) AS orgsUniqState FROM orgPerSegment GROUP BY segmentId +NODE orgLast30States +SQL > + -- Build segment-level DISTINCT organization states (last 30 days). + -- Use WHERE filtering instead of uniqCombinedStateIf() to avoid uniqCombinedIf types. + SELECT segmentId, uniqCombinedState(organizationId) AS orgsLast30UniqState + FROM orgPerSegment + WHERE lastActive >= now() - INTERVAL 30 DAY + GROUP BY segmentId + NODE segmentAggStates SQL > -- Attach hierarchy and snapshot date. - -- Coalesce NULL AggregateFunction states to empty states for segments without activity. + -- Coalesce NULL AggregateFunction states to empty states without using *StateIf combinators. + -- Empty states are generated by aggregating over an empty input (system.one WHERE 0). SELECT toDate(now()) AS snapshotDate, s.id AS segmentId, s.parentId AS parentId, s.grandparentId AS grandparentId, - -- Activities (event-level truth) - ifNull(a.activitiesTotalState, countStateIf(0)) AS activitiesTotalState, - ifNull(a.activitiesLast30DaysState, countStateIf(0)) AS activitiesLast30DaysState, - -- Members DISTINCT states - ifNull(ms.membersUniqState, uniqCombinedStateIf('x', 0)) AS membersUniqState, - ifNull(ms.membersLast30UniqState, uniqCombinedStateIf('x', 0)) AS membersLast30UniqState, - -- Organizations DISTINCT states - ifNull(os.orgsUniqState, uniqCombinedStateIf('x', 0)) AS orgsUniqState, - ifNull(os.orgsLast30UniqState, uniqCombinedStateIf('x', 0)) AS orgsLast30UniqState + -- Activities (AggregateFunction(count)) + ifNull( + a.activitiesTotalState, (SELECT countState() FROM system.one WHERE 0) + ) AS activitiesTotalState, + ifNull( + al30.activitiesLast30DaysState, (SELECT countState() FROM system.one WHERE 0) + ) AS activitiesLast30DaysState, + -- Members (AggregateFunction(uniqCombined, String)) + ifNull( + m.membersUniqState, (SELECT uniqCombinedState(CAST('' AS String)) FROM system.one WHERE 0) + ) AS membersUniqState, + ifNull( + ml30.membersLast30UniqState, + (SELECT uniqCombinedState(CAST('' AS String)) FROM system.one WHERE 0) + ) AS membersLast30UniqState, + -- Organizations (AggregateFunction(uniqCombined, String)) + ifNull( + o.orgsUniqState, (SELECT uniqCombinedState(CAST('' AS String)) FROM system.one WHERE 0) + ) AS orgsUniqState, + ifNull( + ol30.orgsLast30UniqState, + (SELECT uniqCombinedState(CAST('' AS String)) FROM system.one WHERE 0) + ) AS orgsLast30UniqState FROM ( SELECT id, parentId, grandparentId @@ -83,9 +121,12 @@ SQL > AND parentId != '' AND grandparentId != '' ) AS s - LEFT JOIN activitiesSegmentStates AS a ON a.segmentId = s.id - LEFT JOIN memberSegmentStates AS ms ON ms.segmentId = s.id - LEFT JOIN orgSegmentStates AS os ON os.segmentId = s.id + LEFT JOIN activitiesAllTimeStates AS a ON a.segmentId = s.id + LEFT JOIN activitiesLast30States AS al30 ON al30.segmentId = s.id + LEFT JOIN memberAllTimeStates AS m ON m.segmentId = s.id + LEFT JOIN memberLast30States AS ml30 ON ml30.segmentId = s.id + LEFT JOIN orgAllTimeStates AS o ON o.segmentId = s.id + LEFT JOIN orgLast30States AS ol30 ON ol30.segmentId = s.id TYPE COPY TARGET_DATASOURCE cdp_segment_metrics_ds From 8995a856755118b35e2907199d38b7a4d612034e Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Thu, 8 Jan 2026 13:52:00 +0100 Subject: [PATCH 08/10] feat: optimize copy pipe --- .../pipes/cdp_segment_metrics_copy_pipe.pipe | 140 ++++++++---------- 1 file changed, 58 insertions(+), 82 deletions(-) diff --git a/services/libs/tinybird/pipes/cdp_segment_metrics_copy_pipe.pipe b/services/libs/tinybird/pipes/cdp_segment_metrics_copy_pipe.pipe index b324dc8896..4b3db56e0d 100644 --- a/services/libs/tinybird/pipes/cdp_segment_metrics_copy_pipe.pipe +++ b/services/libs/tinybird/pipes/cdp_segment_metrics_copy_pipe.pipe @@ -1,53 +1,59 @@ DESCRIPTION > Daily batch job that builds segment-level aggregate states from existing member and organization segment aggregate datasources. - Activities are computed from activityRelations_enriched_deduplicated_ds using the latest snapshotId. - This version avoids CTEs and avoids *StateIf() combinators to keep AggregateFunction types consistent. + Activities computed from activityRelations_enriched_deduplicated_ds (latest snapshotId). + Optimized: compute only for "valid segments" early, compute latest snapshot once, + and reuse empty states as constants. -NODE activitiesAllTimeStates +NODE validSegments SQL > - -- Compute total activities from event-level data (latest snapshot only). - -- Avoid countStateIf() to keep AggregateFunction(count) type consistent. - SELECT segmentId, countState() AS activitiesTotalState - FROM activityRelations_enriched_deduplicated_ds + SELECT id, parentId, grandparentId + FROM segments WHERE - segmentId IS NOT NULL - AND segmentId != '' - AND snapshotId = (SELECT max(snapshotId) FROM activityRelations_enriched_deduplicated_ds) - GROUP BY segmentId + parentSlug IS NOT NULL + AND grandparentSlug IS NOT NULL + AND parentId IS NOT NULL + AND grandparentId IS NOT NULL + AND parentId != '' + AND grandparentId != '' -NODE activitiesLast30States +NODE activitiesStates SQL > - -- Compute last-30-days activities from event-level data (latest snapshot only). - -- Use WHERE filtering instead of countStateIf() to avoid AggregateFunction(countIf) types. - SELECT segmentId, countState() AS activitiesLast30DaysState - FROM activityRelations_enriched_deduplicated_ds + -- Compute total + last-30-days activities in ONE scan, restricted to valid segments. + SELECT + ar.segmentId, + countState(CAST(1 AS UInt8)) AS activitiesTotalState, + countState( + if(ar.timestamp >= now() - INTERVAL 30 DAY, CAST(1 AS UInt8), NULL) + ) AS activitiesLast30DaysState + FROM activityRelations_enriched_deduplicated_ds AS ar WHERE - segmentId IS NOT NULL + snapshotId = ( + SELECT snapshotId + FROM activityRelations_enriched_deduplicated_ds + ORDER BY snapshotId DESC + LIMIT 1 + ) + AND segmentId IS NOT NULL AND segmentId != '' - AND snapshotId = (SELECT max(snapshotId) FROM activityRelations_enriched_deduplicated_ds) - AND timestamp >= now() - INTERVAL 30 DAY - GROUP BY segmentId + GROUP BY ar.segmentId NODE memberPerSegment SQL > - -- Finalize per (segmentId, memberId) - SELECT segmentId, memberId, maxMerge(lastActiveState) AS lastActive - FROM cdp_member_segment_aggregates_ds - GROUP BY segmentId, memberId + -- Finalize per (segmentId, memberId) only for valid segments + SELECT m.segmentId, m.memberId, maxMerge(m.lastActiveState) AS lastActive + FROM cdp_member_segment_aggregates_ds AS m ANY + INNER JOIN validSegments AS vs ON vs.id = m.segmentId + GROUP BY m.segmentId, m.memberId NODE memberAllTimeStates SQL > - -- Build segment-level DISTINCT member states (all time). - -- Avoid uniqCombinedStateIf() to keep AggregateFunction(uniqCombined, String) type consistent. SELECT segmentId, uniqCombinedState(memberId) AS membersUniqState FROM memberPerSegment GROUP BY segmentId NODE memberLast30States SQL > - -- Build segment-level DISTINCT member states (last 30 days). - -- Use WHERE filtering instead of uniqCombinedStateIf() to avoid uniqCombinedIf types. SELECT segmentId, uniqCombinedState(memberId) AS membersLast30UniqState FROM memberPerSegment WHERE lastActive >= now() - INTERVAL 30 DAY @@ -55,22 +61,20 @@ SQL > NODE orgPerSegment SQL > - -- Finalize per (segmentId, organizationId) - SELECT segmentId, organizationId, maxMerge(lastActiveState) AS lastActive - FROM cdp_organization_segment_aggregates_ds - GROUP BY segmentId, organizationId + -- Finalize per (segmentId, organizationId) only for valid segments + SELECT o.segmentId, o.organizationId, maxMerge(o.lastActiveState) AS lastActive + FROM cdp_organization_segment_aggregates_ds AS o ANY + INNER JOIN validSegments AS vs ON vs.id = o.segmentId + GROUP BY o.segmentId, o.organizationId NODE orgAllTimeStates SQL > - -- Build segment-level DISTINCT organization states (all time). SELECT segmentId, uniqCombinedState(organizationId) AS orgsUniqState FROM orgPerSegment GROUP BY segmentId NODE orgLast30States SQL > - -- Build segment-level DISTINCT organization states (last 30 days). - -- Use WHERE filtering instead of uniqCombinedStateIf() to avoid uniqCombinedIf types. SELECT segmentId, uniqCombinedState(organizationId) AS orgsLast30UniqState FROM orgPerSegment WHERE lastActive >= now() - INTERVAL 30 DAY @@ -78,55 +82,27 @@ SQL > NODE segmentAggStates SQL > - -- Attach hierarchy and snapshot date. - -- Coalesce NULL AggregateFunction states to empty states without using *StateIf combinators. - -- Empty states are generated by aggregating over an empty input (system.one WHERE 0). + -- Attach hierarchy + snapshot date; reuse empty states as constants + WITH + (SELECT countState(CAST(1 AS UInt8)) FROM system.one WHERE 0) AS emptyCountU8State, + (SELECT uniqCombinedState(CAST('' AS String)) FROM system.one WHERE 0) AS emptyUniqStringState SELECT toDate(now()) AS snapshotDate, - s.id AS segmentId, - s.parentId AS parentId, - s.grandparentId AS grandparentId, - -- Activities (AggregateFunction(count)) - ifNull( - a.activitiesTotalState, (SELECT countState() FROM system.one WHERE 0) - ) AS activitiesTotalState, - ifNull( - al30.activitiesLast30DaysState, (SELECT countState() FROM system.one WHERE 0) - ) AS activitiesLast30DaysState, - -- Members (AggregateFunction(uniqCombined, String)) - ifNull( - m.membersUniqState, (SELECT uniqCombinedState(CAST('' AS String)) FROM system.one WHERE 0) - ) AS membersUniqState, - ifNull( - ml30.membersLast30UniqState, - (SELECT uniqCombinedState(CAST('' AS String)) FROM system.one WHERE 0) - ) AS membersLast30UniqState, - -- Organizations (AggregateFunction(uniqCombined, String)) - ifNull( - o.orgsUniqState, (SELECT uniqCombinedState(CAST('' AS String)) FROM system.one WHERE 0) - ) AS orgsUniqState, - ifNull( - ol30.orgsLast30UniqState, - (SELECT uniqCombinedState(CAST('' AS String)) FROM system.one WHERE 0) - ) AS orgsLast30UniqState - FROM - ( - SELECT id, parentId, grandparentId - FROM segments - WHERE - parentSlug IS NOT NULL - AND grandparentSlug IS NOT NULL - AND parentId IS NOT NULL - AND grandparentId IS NOT NULL - AND parentId != '' - AND grandparentId != '' - ) AS s - LEFT JOIN activitiesAllTimeStates AS a ON a.segmentId = s.id - LEFT JOIN activitiesLast30States AS al30 ON al30.segmentId = s.id - LEFT JOIN memberAllTimeStates AS m ON m.segmentId = s.id - LEFT JOIN memberLast30States AS ml30 ON ml30.segmentId = s.id - LEFT JOIN orgAllTimeStates AS o ON o.segmentId = s.id - LEFT JOIN orgLast30States AS ol30 ON ol30.segmentId = s.id + vs.id AS segmentId, + vs.parentId AS parentId, + vs.grandparentId AS grandparentId, + ifNull(a.activitiesTotalState, emptyCountU8State) AS activitiesTotalState, + ifNull(a.activitiesLast30DaysState, emptyCountU8State) AS activitiesLast30DaysState, + ifNull(m.membersUniqState, emptyUniqStringState) AS membersUniqState, + ifNull(ml30.membersLast30UniqState, emptyUniqStringState) AS membersLast30UniqState, + ifNull(o.orgsUniqState, emptyUniqStringState) AS orgsUniqState, + ifNull(ol30.orgsLast30UniqState, emptyUniqStringState) AS orgsLast30UniqState + FROM validSegments AS vs + LEFT JOIN activitiesStates AS a ON a.segmentId = vs.id + LEFT JOIN memberAllTimeStates AS m ON m.segmentId = vs.id + LEFT JOIN memberLast30States AS ml30 ON ml30.segmentId = vs.id + LEFT JOIN orgAllTimeStates AS o ON o.segmentId = vs.id + LEFT JOIN orgLast30States AS ol30 ON ol30.segmentId = vs.id TYPE COPY TARGET_DATASOURCE cdp_segment_metrics_ds From f903075a50a2616cca61e51b7421ba04324f00c2 Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Thu, 8 Jan 2026 19:43:43 +0100 Subject: [PATCH 09/10] feat: remove snapshot --- services/libs/tinybird/pipes/cdp_segment_metrics_copy_pipe.pipe | 1 - 1 file changed, 1 deletion(-) diff --git a/services/libs/tinybird/pipes/cdp_segment_metrics_copy_pipe.pipe b/services/libs/tinybird/pipes/cdp_segment_metrics_copy_pipe.pipe index 4b3db56e0d..2abcb8e01b 100644 --- a/services/libs/tinybird/pipes/cdp_segment_metrics_copy_pipe.pipe +++ b/services/libs/tinybird/pipes/cdp_segment_metrics_copy_pipe.pipe @@ -87,7 +87,6 @@ SQL > (SELECT countState(CAST(1 AS UInt8)) FROM system.one WHERE 0) AS emptyCountU8State, (SELECT uniqCombinedState(CAST('' AS String)) FROM system.one WHERE 0) AS emptyUniqStringState SELECT - toDate(now()) AS snapshotDate, vs.id AS segmentId, vs.parentId AS parentId, vs.grandparentId AS grandparentId, From 87dac216da6de0db96081b84337c6a588c55c0ee Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Fri, 9 Jan 2026 14:36:20 +0100 Subject: [PATCH 10/10] fix: grouping fix --- ...dp_segment_metrics_project_group_sink.pipe | 18 +++++------ .../cdp_segment_metrics_project_sink.pipe | 20 ++++++------ .../cdp_segment_metrics_subproject_sink.pipe | 31 +++++++------------ 3 files changed, 30 insertions(+), 39 deletions(-) diff --git a/services/libs/tinybird/pipes/cdp_segment_metrics_project_group_sink.pipe b/services/libs/tinybird/pipes/cdp_segment_metrics_project_group_sink.pipe index 1d3ea0abd5..fa60ec352c 100644 --- a/services/libs/tinybird/pipes/cdp_segment_metrics_project_group_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_segment_metrics_project_group_sink.pipe @@ -7,14 +7,14 @@ SQL > SELECT s.grandparentId AS segmentId, 'projectGroup' AS segmentType, - s.grandparentId AS parentId, - s.grandparentId AS grandparentId, - s.grandparentSlug AS segmentSlug, - s.grandparentSlug AS parentSlug, - s.grandparentSlug AS grandparentSlug, - s.grandparentName AS segmentName, - s.grandparentName AS parentName, - s.grandparentName AS grandparentName, + any (s.grandparentId) AS parentId, + any (s.grandparentId) AS grandparentId, + any (s.grandparentSlug) AS segmentSlug, + any (s.grandparentSlug) AS parentSlug, + any (s.grandparentSlug) AS grandparentSlug, + max(s.grandparentName) AS segmentName, + max(s.grandparentName) AS parentName, + max(s.grandparentName) AS grandparentName, countMerge(sa.activitiesTotalState) AS activitiesTotal, countMerge(sa.activitiesLast30DaysState) AS activitiesLast30Days, uniqCombinedMerge(sa.membersUniqState) AS membersTotal, @@ -33,7 +33,7 @@ SQL > AND s.grandparentId IS NOT NULL AND s.parentId != '' AND s.grandparentId != '' - GROUP BY s.grandparentId, s.grandparentSlug, s.grandparentName + GROUP BY s.grandparentId TYPE SINK EXPORT_SERVICE kafka diff --git a/services/libs/tinybird/pipes/cdp_segment_metrics_project_sink.pipe b/services/libs/tinybird/pipes/cdp_segment_metrics_project_sink.pipe index 873cf42020..0570bb8129 100644 --- a/services/libs/tinybird/pipes/cdp_segment_metrics_project_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_segment_metrics_project_sink.pipe @@ -7,14 +7,15 @@ SQL > SELECT s.parentId AS segmentId, 'project' AS segmentType, - s.grandparentId AS parentId, - s.grandparentId AS grandparentId, - s.parentSlug AS segmentSlug, - s.grandparentSlug AS parentSlug, - s.grandparentSlug AS grandparentSlug, - s.parentName AS segmentName, - s.grandparentName AS parentName, - s.grandparentName AS grandparentName, + any (s.grandparentId) AS parentId, + any (s.grandparentId) AS grandparentId, + coalesce(nullIf(anyHeavy(s.parentSlug), ''), any (s.parentSlug)) AS segmentSlug, + coalesce(nullIf(anyHeavy(s.grandparentSlug), ''), any (s.grandparentSlug)) AS parentSlug, + coalesce(nullIf(anyHeavy(s.grandparentSlug), ''), any (s.grandparentSlug)) AS grandparentSlug, + -- pick a non-empty name if available + coalesce(nullIf(anyHeavy(s.parentName), ''), any (s.parentName)) AS segmentName, + coalesce(nullIf(anyHeavy(s.grandparentName), ''), any (s.grandparentName)) AS parentName, + coalesce(nullIf(anyHeavy(s.grandparentName), ''), any (s.grandparentName)) AS grandparentName, countMerge(sa.activitiesTotalState) AS activitiesTotal, countMerge(sa.activitiesLast30DaysState) AS activitiesLast30Days, uniqCombinedMerge(sa.membersUniqState) AS membersTotal, @@ -33,8 +34,7 @@ SQL > AND s.grandparentId IS NOT NULL AND s.parentId != '' AND s.grandparentId != '' - GROUP BY - s.parentId, s.grandparentId, s.parentSlug, s.grandparentSlug, s.parentName, s.grandparentName + GROUP BY s.parentId TYPE SINK EXPORT_SERVICE kafka diff --git a/services/libs/tinybird/pipes/cdp_segment_metrics_subproject_sink.pipe b/services/libs/tinybird/pipes/cdp_segment_metrics_subproject_sink.pipe index 4ffaabfc17..17ae27d093 100644 --- a/services/libs/tinybird/pipes/cdp_segment_metrics_subproject_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_segment_metrics_subproject_sink.pipe @@ -7,23 +7,23 @@ SQL > SELECT s.id AS segmentId, 'subproject' AS segmentType, - s.parentId AS parentId, - s.grandparentId AS grandparentId, - s.slug AS segmentSlug, - s.parentSlug AS parentSlug, - s.grandparentSlug AS grandparentSlug, - s.name AS segmentName, - s.parentName AS parentName, - s.grandparentName AS grandparentName, + any (s.parentId) AS parentId, + any (s.grandparentId) AS grandparentId, + any (s.slug) AS segmentSlug, + any (s.parentSlug) AS parentSlug, + any (s.grandparentSlug) AS grandparentSlug, + max(s.name) AS segmentName, + max(s.parentName) AS parentName, + max(s.grandparentName) AS grandparentName, countMerge(sa.activitiesTotalState) AS activitiesTotal, countMerge(sa.activitiesLast30DaysState) AS activitiesLast30Days, uniqCombinedMerge(sa.membersUniqState) AS membersTotal, uniqCombinedMerge(sa.membersLast30UniqState) AS membersLast30Days, uniqCombinedMerge(sa.orgsUniqState) AS organizationsTotal, uniqCombinedMerge(sa.orgsLast30UniqState) AS organizationsLast30Days - FROM segments AS s + FROM segments s LEFT JOIN - cdp_segment_metrics_ds AS sa + cdp_segment_metrics_ds sa ON sa.segmentId = s.id AND sa.snapshotDate = (SELECT max(snapshotDate) FROM cdp_segment_metrics_ds) WHERE @@ -33,16 +33,7 @@ SQL > AND s.grandparentId IS NOT NULL AND s.parentId != '' AND s.grandparentId != '' - GROUP BY - s.id, - s.parentId, - s.grandparentId, - s.slug, - s.parentSlug, - s.grandparentSlug, - s.name, - s.parentName, - s.grandparentName + GROUP BY s.id TYPE SINK EXPORT_SERVICE kafka