diff --git a/services/libs/tinybird/datasources/cdp_segment_metrics_ds.datasource b/services/libs/tinybird/datasources/cdp_segment_metrics_ds.datasource new file mode 100644 index 0000000000..91ad4181e1 --- /dev/null +++ b/services/libs/tinybird/datasources/cdp_segment_metrics_ds.datasource @@ -0,0 +1,19 @@ +DESCRIPTION > + Segment-level aggregate states (one row per segmentId per snapshot). + Stores AggregateFunction states for DISTINCT rollups (uniqCombinedState). + +SCHEMA > + `snapshotDate` Date, + `segmentId` String, + `parentId` String, + `grandparentId` String, + `activitiesTotalState` AggregateFunction(count), + `activitiesLast30DaysState` AggregateFunction(count), + `membersUniqState` AggregateFunction(uniqCombined, String), + `membersLast30UniqState` AggregateFunction(uniqCombined, String), + `orgsUniqState` AggregateFunction(uniqCombined, String), + `orgsLast30UniqState` AggregateFunction(uniqCombined, String) + +ENGINE AggregatingMergeTree +ENGINE_PARTITION_KEY snapshotDate +ENGINE_SORTING_KEY (snapshotDate, segmentId) diff --git a/services/libs/tinybird/pipes/cdp_segment_metrics_copy_pipe.pipe b/services/libs/tinybird/pipes/cdp_segment_metrics_copy_pipe.pipe new file mode 100644 index 0000000000..2abcb8e01b --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_segment_metrics_copy_pipe.pipe @@ -0,0 +1,109 @@ +DESCRIPTION > + Daily batch job that builds segment-level aggregate states from existing + member and organization segment aggregate datasources. + Activities computed from activityRelations_enriched_deduplicated_ds (latest snapshotId). + Optimized: compute only for "valid segments" early, compute latest snapshot once, + and reuse empty states as constants. + +NODE validSegments +SQL > + SELECT id, parentId, grandparentId + FROM segments + WHERE + parentSlug IS NOT NULL + AND grandparentSlug IS NOT NULL + AND parentId IS NOT NULL + AND grandparentId IS NOT NULL + AND parentId != '' + AND grandparentId != '' + +NODE activitiesStates +SQL > + -- Compute total + last-30-days activities in ONE scan, restricted to valid segments. + SELECT + ar.segmentId, + countState(CAST(1 AS UInt8)) AS activitiesTotalState, + countState( + if(ar.timestamp >= now() - INTERVAL 30 DAY, CAST(1 AS UInt8), NULL) + ) AS activitiesLast30DaysState + FROM activityRelations_enriched_deduplicated_ds AS ar + WHERE + snapshotId = ( + SELECT snapshotId + FROM activityRelations_enriched_deduplicated_ds + ORDER BY snapshotId DESC + LIMIT 1 + ) + AND segmentId IS NOT NULL + AND segmentId != '' + GROUP BY ar.segmentId + +NODE memberPerSegment +SQL > + -- Finalize per (segmentId, memberId) only for valid segments + SELECT m.segmentId, m.memberId, maxMerge(m.lastActiveState) AS lastActive + FROM cdp_member_segment_aggregates_ds AS m ANY + INNER JOIN validSegments AS vs ON vs.id = m.segmentId + GROUP BY m.segmentId, m.memberId + +NODE memberAllTimeStates +SQL > + SELECT segmentId, uniqCombinedState(memberId) AS membersUniqState + FROM memberPerSegment + GROUP BY segmentId + +NODE memberLast30States +SQL > + SELECT segmentId, uniqCombinedState(memberId) AS membersLast30UniqState + FROM memberPerSegment + WHERE lastActive >= now() - INTERVAL 30 DAY + GROUP BY segmentId + +NODE orgPerSegment +SQL > + -- Finalize per (segmentId, organizationId) only for valid segments + SELECT o.segmentId, o.organizationId, maxMerge(o.lastActiveState) AS lastActive + FROM cdp_organization_segment_aggregates_ds AS o ANY + INNER JOIN validSegments AS vs ON vs.id = o.segmentId + GROUP BY o.segmentId, o.organizationId + +NODE orgAllTimeStates +SQL > + SELECT segmentId, uniqCombinedState(organizationId) AS orgsUniqState + FROM orgPerSegment + GROUP BY segmentId + +NODE orgLast30States +SQL > + SELECT segmentId, uniqCombinedState(organizationId) AS orgsLast30UniqState + FROM orgPerSegment + WHERE lastActive >= now() - INTERVAL 30 DAY + GROUP BY segmentId + +NODE segmentAggStates +SQL > + -- Attach hierarchy + snapshot date; reuse empty states as constants + WITH + (SELECT countState(CAST(1 AS UInt8)) FROM system.one WHERE 0) AS emptyCountU8State, + (SELECT uniqCombinedState(CAST('' AS String)) FROM system.one WHERE 0) AS emptyUniqStringState + SELECT + vs.id AS segmentId, + vs.parentId AS parentId, + vs.grandparentId AS grandparentId, + ifNull(a.activitiesTotalState, emptyCountU8State) AS activitiesTotalState, + ifNull(a.activitiesLast30DaysState, emptyCountU8State) AS activitiesLast30DaysState, + ifNull(m.membersUniqState, emptyUniqStringState) AS membersUniqState, + ifNull(ml30.membersLast30UniqState, emptyUniqStringState) AS membersLast30UniqState, + ifNull(o.orgsUniqState, emptyUniqStringState) AS orgsUniqState, + ifNull(ol30.orgsLast30UniqState, emptyUniqStringState) AS orgsLast30UniqState + FROM validSegments AS vs + LEFT JOIN activitiesStates AS a ON a.segmentId = vs.id + LEFT JOIN memberAllTimeStates AS m ON m.segmentId = vs.id + LEFT JOIN memberLast30States AS ml30 ON ml30.segmentId = vs.id + LEFT JOIN orgAllTimeStates AS o ON o.segmentId = vs.id + LEFT JOIN orgLast30States AS ol30 ON ol30.segmentId = vs.id + +TYPE COPY +TARGET_DATASOURCE cdp_segment_metrics_ds +COPY_MODE replace +COPY_SCHEDULE 0 9 * * * diff --git a/services/libs/tinybird/pipes/cdp_segment_metrics_project_group_sink.pipe b/services/libs/tinybird/pipes/cdp_segment_metrics_project_group_sink.pipe new file mode 100644 index 0000000000..fa60ec352c --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_segment_metrics_project_group_sink.pipe @@ -0,0 +1,44 @@ +DESCRIPTION > + CDP dashboard metrics per segment - PROJECT GROUP level. + Rolls up subproject aggregate states via grandparentId. + +NODE projectGroupMetrics +SQL > + SELECT + s.grandparentId AS segmentId, + 'projectGroup' AS segmentType, + any (s.grandparentId) AS parentId, + any (s.grandparentId) AS grandparentId, + any (s.grandparentSlug) AS segmentSlug, + any (s.grandparentSlug) AS parentSlug, + any (s.grandparentSlug) AS grandparentSlug, + max(s.grandparentName) AS segmentName, + max(s.grandparentName) AS parentName, + max(s.grandparentName) AS grandparentName, + countMerge(sa.activitiesTotalState) AS activitiesTotal, + countMerge(sa.activitiesLast30DaysState) AS activitiesLast30Days, + uniqCombinedMerge(sa.membersUniqState) AS membersTotal, + uniqCombinedMerge(sa.membersLast30UniqState) AS membersLast30Days, + uniqCombinedMerge(sa.orgsUniqState) AS organizationsTotal, + uniqCombinedMerge(sa.orgsLast30UniqState) AS organizationsLast30Days + FROM segments AS s + LEFT JOIN + cdp_segment_metrics_ds AS sa + ON sa.segmentId = s.id + AND sa.snapshotDate = (SELECT max(snapshotDate) FROM cdp_segment_metrics_ds) + WHERE + s.parentSlug IS NOT NULL + AND s.grandparentSlug IS NOT NULL + AND s.parentId IS NOT NULL + AND s.grandparentId IS NOT NULL + AND s.parentId != '' + AND s.grandparentId != '' + GROUP BY s.grandparentId + +TYPE SINK +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_SCHEDULE 35 9 * * * +EXPORT_FORMAT json +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_per_segment_sink diff --git a/services/libs/tinybird/pipes/cdp_segment_metrics_project_sink.pipe b/services/libs/tinybird/pipes/cdp_segment_metrics_project_sink.pipe new file mode 100644 index 0000000000..0570bb8129 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_segment_metrics_project_sink.pipe @@ -0,0 +1,45 @@ +DESCRIPTION > + CDP dashboard metrics per segment - PROJECT level. + Rolls up subproject aggregate states via parentId. + +NODE projectMetrics +SQL > + SELECT + s.parentId AS segmentId, + 'project' AS segmentType, + any (s.grandparentId) AS parentId, + any (s.grandparentId) AS grandparentId, + coalesce(nullIf(anyHeavy(s.parentSlug), ''), any (s.parentSlug)) AS segmentSlug, + coalesce(nullIf(anyHeavy(s.grandparentSlug), ''), any (s.grandparentSlug)) AS parentSlug, + coalesce(nullIf(anyHeavy(s.grandparentSlug), ''), any (s.grandparentSlug)) AS grandparentSlug, + -- pick a non-empty name if available + coalesce(nullIf(anyHeavy(s.parentName), ''), any (s.parentName)) AS segmentName, + coalesce(nullIf(anyHeavy(s.grandparentName), ''), any (s.grandparentName)) AS parentName, + coalesce(nullIf(anyHeavy(s.grandparentName), ''), any (s.grandparentName)) AS grandparentName, + countMerge(sa.activitiesTotalState) AS activitiesTotal, + countMerge(sa.activitiesLast30DaysState) AS activitiesLast30Days, + uniqCombinedMerge(sa.membersUniqState) AS membersTotal, + uniqCombinedMerge(sa.membersLast30UniqState) AS membersLast30Days, + uniqCombinedMerge(sa.orgsUniqState) AS organizationsTotal, + uniqCombinedMerge(sa.orgsLast30UniqState) AS organizationsLast30Days + FROM segments AS s + LEFT JOIN + cdp_segment_metrics_ds AS sa + ON sa.segmentId = s.id + AND sa.snapshotDate = (SELECT max(snapshotDate) FROM cdp_segment_metrics_ds) + WHERE + s.parentSlug IS NOT NULL + AND s.grandparentSlug IS NOT NULL + AND s.parentId IS NOT NULL + AND s.grandparentId IS NOT NULL + AND s.parentId != '' + AND s.grandparentId != '' + GROUP BY s.parentId + +TYPE SINK +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_SCHEDULE 40 9 * * * +EXPORT_FORMAT json +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_per_segment_sink diff --git a/services/libs/tinybird/pipes/cdp_segment_metrics_subproject_sink.pipe b/services/libs/tinybird/pipes/cdp_segment_metrics_subproject_sink.pipe new file mode 100644 index 0000000000..17ae27d093 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_segment_metrics_subproject_sink.pipe @@ -0,0 +1,44 @@ +DESCRIPTION > + CDP dashboard metrics per segment - SUBPROJECT level. + Finalizes segment-level aggregate states and exports them. + +NODE subprojectMetrics +SQL > + SELECT + s.id AS segmentId, + 'subproject' AS segmentType, + any (s.parentId) AS parentId, + any (s.grandparentId) AS grandparentId, + any (s.slug) AS segmentSlug, + any (s.parentSlug) AS parentSlug, + any (s.grandparentSlug) AS grandparentSlug, + max(s.name) AS segmentName, + max(s.parentName) AS parentName, + max(s.grandparentName) AS grandparentName, + countMerge(sa.activitiesTotalState) AS activitiesTotal, + countMerge(sa.activitiesLast30DaysState) AS activitiesLast30Days, + uniqCombinedMerge(sa.membersUniqState) AS membersTotal, + uniqCombinedMerge(sa.membersLast30UniqState) AS membersLast30Days, + uniqCombinedMerge(sa.orgsUniqState) AS organizationsTotal, + uniqCombinedMerge(sa.orgsLast30UniqState) AS organizationsLast30Days + FROM segments s + LEFT JOIN + cdp_segment_metrics_ds sa + ON sa.segmentId = s.id + AND sa.snapshotDate = (SELECT max(snapshotDate) FROM cdp_segment_metrics_ds) + WHERE + s.parentSlug IS NOT NULL + AND s.grandparentSlug IS NOT NULL + AND s.parentId IS NOT NULL + AND s.grandparentId IS NOT NULL + AND s.parentId != '' + AND s.grandparentId != '' + GROUP BY s.id + +TYPE SINK +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_SCHEDULE 30 9 * * * +EXPORT_FORMAT json +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_per_segment_sink