From daccc8dc4463954afd076b6a663dff4f5ac59e46 Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Wed, 3 Dec 2025 17:52:44 +0100 Subject: [PATCH 01/11] feat: add cdp_metrics_sink --- .../pipes/cdp_dashboard_metrics_sink.pipe | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 services/libs/tinybird/pipes/cdp_dashboard_metrics_sink.pipe diff --git a/services/libs/tinybird/pipes/cdp_dashboard_metrics_sink.pipe b/services/libs/tinybird/pipes/cdp_dashboard_metrics_sink.pipe new file mode 100644 index 0000000000..cd13a7ad02 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_dashboard_metrics_sink.pipe @@ -0,0 +1,62 @@ +DESCRIPTION > + Global metrics for activities, organizations, and members used for CDP dashboard. + +NODE activityRelations_metrics +SQL > + SELECT + count() AS activities_total, + countIf(createdAt >= now() - INTERVAL 30 DAY) AS activities_last_30_days + FROM activityRelations_deduplicated_ds +END + +NODE organizations_metrics +SQL > + SELECT + count() AS organizations_total, + countIf(createdAt >= now() - INTERVAL 30 DAY) AS organizations_last_30_days + FROM organizations FINAL +END + +NODE members_metrics +SQL > + SELECT + count() AS members_total, + countIf(joinedAt >= now() - INTERVAL 30 DAY) AS members_last_30_days + FROM members FINAL +END + +NODE merge_results +SQL > +SELECT + -- activity + (SELECT activities_total + FROM activityRelations_metrics) AS activities_total, + (SELECT activities_last_30_days + FROM activityRelations_metrics) AS activities_last_30_days, + + -- organizations + (SELECT organizations_total + FROM organizations_metrics) AS organizations_total, + (SELECT organizations_last_30_days + FROM organizations_metrics) AS organizations_last_30_days, + + -- members + (SELECT members_total + FROM members_metrics) AS members_total, + (SELECT members_last_30_days + FROM members_metrics) AS members_last_30_days +END + +NODE cdp_dashboard_full_metrics +SQL > + SELECT * + FROM merge_results +END + +TYPE SINK +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_SCHEDULE 30 0 * * * +EXPORT_FORMAT csv +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_sink From 5489c255b0ac1d3af1c26c6e251cee30c45ff838 Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Wed, 3 Dec 2025 17:53:41 +0100 Subject: [PATCH 02/11] feat: add cdp_metrics_sink --- .../pipes/cdp_dashboard_metrics_sink.pipe | 61 +++++++------------ 1 file changed, 23 insertions(+), 38 deletions(-) diff --git a/services/libs/tinybird/pipes/cdp_dashboard_metrics_sink.pipe b/services/libs/tinybird/pipes/cdp_dashboard_metrics_sink.pipe index cd13a7ad02..0b9bb487df 100644 --- a/services/libs/tinybird/pipes/cdp_dashboard_metrics_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_dashboard_metrics_sink.pipe @@ -1,57 +1,42 @@ DESCRIPTION > - Global metrics for activities, organizations, and members used for CDP dashboard. + Global metrics for activities, organizations, and members used for CDP dashboard. NODE activityRelations_metrics SQL > - SELECT - count() AS activities_total, - countIf(createdAt >= now() - INTERVAL 30 DAY) AS activities_last_30_days - FROM activityRelations_deduplicated_ds -END + SELECT + count() AS activities_total, + countIf(createdAt >= now() - INTERVAL 30 DAY) AS activities_last_30_days + FROM activityRelations_deduplicated_ds NODE organizations_metrics SQL > - SELECT - count() AS organizations_total, - countIf(createdAt >= now() - INTERVAL 30 DAY) AS organizations_last_30_days - FROM organizations FINAL -END + SELECT + count() AS organizations_total, + countIf(createdAt >= now() - INTERVAL 30 DAY) AS organizations_last_30_days + FROM organizations FINAL NODE members_metrics SQL > - SELECT - count() AS members_total, - countIf(joinedAt >= now() - INTERVAL 30 DAY) AS members_last_30_days - FROM members FINAL -END + SELECT + count() AS members_total, countIf(joinedAt >= now() - INTERVAL 30 DAY) AS members_last_30_days + FROM members FINAL NODE merge_results SQL > -SELECT - -- activity - (SELECT activities_total - FROM activityRelations_metrics) AS activities_total, - (SELECT activities_last_30_days - FROM activityRelations_metrics) AS activities_last_30_days, - - -- organizations - (SELECT organizations_total - FROM organizations_metrics) AS organizations_total, - (SELECT organizations_last_30_days - FROM organizations_metrics) AS organizations_last_30_days, - - -- members - (SELECT members_total - FROM members_metrics) AS members_total, - (SELECT members_last_30_days - FROM members_metrics) AS members_last_30_days -END + SELECT + -- activity + (SELECT activities_total FROM activityRelations_metrics) AS activities_total, + (SELECT activities_last_30_days FROM activityRelations_metrics) AS activities_last_30_days, + -- organizations + (SELECT organizations_total FROM organizations_metrics) AS organizations_total, + (SELECT organizations_last_30_days FROM organizations_metrics) AS organizations_last_30_days, + -- members + (SELECT members_total FROM members_metrics) AS members_total, + (SELECT members_last_30_days FROM members_metrics) AS members_last_30_days NODE cdp_dashboard_full_metrics SQL > - SELECT * - FROM merge_results -END + SELECT * FROM merge_results TYPE SINK EXPORT_SERVICE kafka From d48a4bc8b96a106b7b1f4abaefac7f68af7a9e1a Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Tue, 9 Dec 2025 11:49:42 +0100 Subject: [PATCH 03/11] fix: adjust pipes name for total metrics --- .../pipes/cdp_dashboard_metrics_sink.pipe | 47 ------------------- .../cdp_dashboard_metrics_total_sink.pipe | 46 ++++++++++++++++++ 2 files changed, 46 insertions(+), 47 deletions(-) delete mode 100644 services/libs/tinybird/pipes/cdp_dashboard_metrics_sink.pipe create mode 100644 services/libs/tinybird/pipes/cdp_dashboard_metrics_total_sink.pipe diff --git a/services/libs/tinybird/pipes/cdp_dashboard_metrics_sink.pipe b/services/libs/tinybird/pipes/cdp_dashboard_metrics_sink.pipe deleted file mode 100644 index 0b9bb487df..0000000000 --- a/services/libs/tinybird/pipes/cdp_dashboard_metrics_sink.pipe +++ /dev/null @@ -1,47 +0,0 @@ -DESCRIPTION > - Global metrics for activities, organizations, and members used for CDP dashboard. - -NODE activityRelations_metrics -SQL > - SELECT - count() AS activities_total, - countIf(createdAt >= now() - INTERVAL 30 DAY) AS activities_last_30_days - FROM activityRelations_deduplicated_ds - -NODE organizations_metrics -SQL > - SELECT - count() AS organizations_total, - countIf(createdAt >= now() - INTERVAL 30 DAY) AS organizations_last_30_days - FROM organizations FINAL - -NODE members_metrics -SQL > - SELECT - count() AS members_total, countIf(joinedAt >= now() - INTERVAL 30 DAY) AS members_last_30_days - FROM members FINAL - -NODE merge_results -SQL > - SELECT - -- activity - (SELECT activities_total FROM activityRelations_metrics) AS activities_total, - (SELECT activities_last_30_days FROM activityRelations_metrics) AS activities_last_30_days, - -- organizations - (SELECT organizations_total FROM organizations_metrics) AS organizations_total, - (SELECT organizations_last_30_days FROM organizations_metrics) AS organizations_last_30_days, - -- members - (SELECT members_total FROM members_metrics) AS members_total, - (SELECT members_last_30_days FROM members_metrics) AS members_last_30_days - -NODE cdp_dashboard_full_metrics -SQL > - SELECT * FROM merge_results - -TYPE SINK -EXPORT_SERVICE kafka -EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming -EXPORT_SCHEDULE 30 0 * * * -EXPORT_FORMAT csv -EXPORT_STRATEGY @new -EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_sink diff --git a/services/libs/tinybird/pipes/cdp_dashboard_metrics_total_sink.pipe b/services/libs/tinybird/pipes/cdp_dashboard_metrics_total_sink.pipe new file mode 100644 index 0000000000..51c219a9dc --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_dashboard_metrics_total_sink.pipe @@ -0,0 +1,46 @@ +DESCRIPTION > + Global metrics for activities, organizations, and members used for CDP dashboard. We. are referring to the total witouth filtering by any segment + +NODE activityRelationsMetricsTotal +SQL > + SELECT + count() AS activitiesTotal, + countIf(createdAt >= now() - INTERVAL 30 DAY) AS activitiesLast30Days + FROM activityRelations_deduplicated_ds + +NODE organizationsMetricsTotal +SQL > + SELECT + count() AS organizationsTotal, + countIf(createdAt >= now() - INTERVAL 30 DAY) AS organizationsLast30Days + FROM organizations FINAL + +NODE membersMetricsTotal +SQL > + SELECT count() AS membersTotal, countIf(joinedAt >= now() - INTERVAL 30 DAY) AS membersLast30Days + FROM members FINAL + +NODE mergeResults +SQL > + SELECT + -- activity + (SELECT activitiesTotal FROM activityRelationsMetricsTotal) AS activitiesTotal, + (SELECT activitiesLast30Days FROM activityRelationsMetricsTotal) AS activitiesLast30Days, + -- organizations + (SELECT organizationsTotal FROM organizationsMetricsTotal) AS organizationsTotal, + (SELECT organizationsLast30Days FROM organizationsMetricsTotal) AS organizationsLast30Days, + -- members + (SELECT membersTotal FROM membersMetricsTotal) AS membersTotal, + (SELECT membersLast30Days FROM membersMetricsTotal) AS membersLast30Days + +NODE cdpDashboardFullMetricsTotal +SQL > + SELECT * FROM mergeResults + +TYPE SINK +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming-staging +EXPORT_SCHEDULE 30 0 * * * +EXPORT_FORMAT csv +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_total_sink From ff5f282de6c121b385523a2465d7ef2f03e192c3 Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Tue, 9 Dec 2025 22:23:35 +0100 Subject: [PATCH 04/11] feat: add custom smt --- scripts/scaffold/kafka-connect/Dockerfile | 3 +++ .../libs/tinybird/pipes/cdp_dashboard_metrics_total_sink.pipe | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/scripts/scaffold/kafka-connect/Dockerfile b/scripts/scaffold/kafka-connect/Dockerfile index 015bf97f41..f6070a0b3a 100644 --- a/scripts/scaffold/kafka-connect/Dockerfile +++ b/scripts/scaffold/kafka-connect/Dockerfile @@ -5,7 +5,10 @@ USER root RUN yum install -y jq findutils unzip RUN confluent-hub install snowflakeinc/snowflake-kafka-connector:2.5.0 --no-prompt +RUN confluent-hub install confluentinc/kafka-connect-jdbc:10.7.4 --no-prompt + COPY tmp/kafka-connect-http/ /usr/share/confluent-hub-components/kafka-connect-http/ +COPY tmp/custom-plugins/tinybird-smt-1.0.0.jar /usr/share/java/tinybird-smt-1.0.0.jar VOLUME /storage diff --git a/services/libs/tinybird/pipes/cdp_dashboard_metrics_total_sink.pipe b/services/libs/tinybird/pipes/cdp_dashboard_metrics_total_sink.pipe index 51c219a9dc..26c15991b9 100644 --- a/services/libs/tinybird/pipes/cdp_dashboard_metrics_total_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_dashboard_metrics_total_sink.pipe @@ -41,6 +41,6 @@ TYPE SINK EXPORT_SERVICE kafka EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming-staging EXPORT_SCHEDULE 30 0 * * * -EXPORT_FORMAT csv +EXPORT_FORMAT json EXPORT_STRATEGY @new EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_total_sink From 5fbcf64431c1504be27c67bf960fb107ac8360a4 Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Wed, 10 Dec 2025 16:42:38 +0100 Subject: [PATCH 05/11] feat: add per segment pipe --- scripts/scaffold/kafka-connect/Dockerfile | 2 +- ...p_dashboard_metrics_per_segments_sink.pipe | 61 +++++++++++++++++++ .../cdp_dashboard_metrics_total_sink.pipe | 2 +- 3 files changed, 63 insertions(+), 2 deletions(-) create mode 100644 services/libs/tinybird/pipes/cdp_dashboard_metrics_per_segments_sink.pipe diff --git a/scripts/scaffold/kafka-connect/Dockerfile b/scripts/scaffold/kafka-connect/Dockerfile index f6070a0b3a..1aacf70d27 100644 --- a/scripts/scaffold/kafka-connect/Dockerfile +++ b/scripts/scaffold/kafka-connect/Dockerfile @@ -8,7 +8,7 @@ RUN confluent-hub install snowflakeinc/snowflake-kafka-connector:2.5.0 --no-prom RUN confluent-hub install confluentinc/kafka-connect-jdbc:10.7.4 --no-prompt COPY tmp/kafka-connect-http/ /usr/share/confluent-hub-components/kafka-connect-http/ -COPY tmp/custom-plugins/tinybird-smt-1.0.0.jar /usr/share/java/tinybird-smt-1.0.0.jar +COPY tmp/custom-plugins/tinybird-smt-1.0.1.jar /usr/share/java/tinybird-smt-1.0.1.jar VOLUME /storage diff --git a/services/libs/tinybird/pipes/cdp_dashboard_metrics_per_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_dashboard_metrics_per_segments_sink.pipe new file mode 100644 index 0000000000..d62751310b --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_dashboard_metrics_per_segments_sink.pipe @@ -0,0 +1,61 @@ +DESCRIPTION > + CDP dashboard aggregated metrics by segment. + For each root segmentId (which can be: + - a grandparentId -> aggregates all segments with that grandparentId + - a parentId -> aggregates all segments with that parentId + - a standalone id -> only itself) + we compute: + - activitiesTotal / activitiesLast30Days (from activityRelations_deduplicated_ds) + - organizationsTotal / organizationsLast30Days (distinct orgs created in the last 30 days) + - membersTotal / membersLast30Days (distinct members created in the last 30 days) + Assumptions: + - Hierarchy is tree-like: a sub-project belongs to exactly one parent/grandparent. + - A member present in multiple sub-projects under the same root is counted ONCE for that root. + - "Last 30 days" for members/orgs means "created in the last 30 days". + +NODE segmentsRootMapping +SQL > + SELECT grandparentId AS segmentId, id AS childSegmentId + FROM segments + WHERE grandparentId IS NOT NULL + UNION ALL + SELECT parentId AS segmentId, id AS childSegmentId + FROM segments + WHERE parentId IS NOT NULL AND grandparentId IS NULL + UNION ALL + SELECT id AS segmentId, id AS childSegmentId + FROM segments + WHERE parentId IS NULL AND grandparentId IS NULL + +NODE segmentMetricsFromActivities +SQL > + SELECT + m.segmentId AS segmentId, + -- activity metrics + count() AS activitiesTotal, + countIf(a.createdAt >= now() - INTERVAL 30 DAY) AS activitiesLast30Days, + -- member metrics + countDistinct(a.memberId) AS membersTotal, + countDistinctIf(a.memberId, mem.joinedAt >= now() - INTERVAL 30 DAY) AS membersLast30Days, + -- organization metrics + countDistinct(a.organizationId) AS organizationsTotal, + countDistinctIf( + a.organizationId, org.createdAt >= now() - INTERVAL 30 DAY + ) AS organizationsLast30Days + FROM activityRelations_deduplicated_ds AS a + INNER JOIN segmentsRootMapping AS m ON a.segmentId = m.childSegmentId + LEFT JOIN members AS mem FINAL ON a.memberId = mem.id + LEFT JOIN organizations AS org FINAL ON a.organizationId = org.id + GROUP BY segmentId + +NODE cdpDashboardMetricsPerSegment +SQL > + SELECT * FROM segmentMetricsFromActivities + +TYPE SINK +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming-staging +EXPORT_SCHEDULE 30 0 * * * +EXPORT_FORMAT json +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_per_segment_sink diff --git a/services/libs/tinybird/pipes/cdp_dashboard_metrics_total_sink.pipe b/services/libs/tinybird/pipes/cdp_dashboard_metrics_total_sink.pipe index 26c15991b9..c38a151efd 100644 --- a/services/libs/tinybird/pipes/cdp_dashboard_metrics_total_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_dashboard_metrics_total_sink.pipe @@ -40,7 +40,7 @@ SQL > TYPE SINK EXPORT_SERVICE kafka EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming-staging -EXPORT_SCHEDULE 30 0 * * * +EXPORT_SCHEDULE 0 9 * * * EXPORT_FORMAT json EXPORT_STRATEGY @new EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_total_sink From 06df96d19481008fa6d491c3589fdf60acfbd17a Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Thu, 11 Dec 2025 18:53:26 +0100 Subject: [PATCH 06/11] fix: segments - child permutation --- ...p_dashboard_metrics_per_segments_sink.pipe | 36 +++++++++++-------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/services/libs/tinybird/pipes/cdp_dashboard_metrics_per_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_dashboard_metrics_per_segments_sink.pipe index d62751310b..2459b6470b 100644 --- a/services/libs/tinybird/pipes/cdp_dashboard_metrics_per_segments_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_dashboard_metrics_per_segments_sink.pipe @@ -1,31 +1,37 @@ DESCRIPTION > CDP dashboard aggregated metrics by segment. - For each root segmentId (which can be: - - a grandparentId -> aggregates all segments with that grandparentId - - a parentId -> aggregates all segments with that parentId - - a standalone id -> only itself) - we compute: + For each segmentId (root, parent or leaf) we aggregate metrics over + that segment and all its descendants in the hierarchy. + We compute: - activitiesTotal / activitiesLast30Days (from activityRelations_deduplicated_ds) - organizationsTotal / organizationsLast30Days (distinct orgs created in the last 30 days) - membersTotal / membersLast30Days (distinct members created in the last 30 days) Assumptions: - Hierarchy is tree-like: a sub-project belongs to exactly one parent/grandparent. - - A member present in multiple sub-projects under the same root is counted ONCE for that root. + - A member present in multiple sub-projects under the same segment subtree is counted ONCE per segmentId. - "Last 30 days" for members/orgs means "created in the last 30 days". -NODE segmentsRootMapping +NODE segmentsHierarchyMapping SQL > - SELECT grandparentId AS segmentId, id AS childSegmentId + -- For every segment, map it to: + -- - itself + -- - its direct children + -- - its grandchildren (via grandparentId) + -- + -- Result: + -- - a root segment will have: (root, root), (root, child), (root, grandchild) + -- - a mid-level segment will: (mid, mid), (mid, grandchild) + -- - a leaf segment will only: (leaf, leaf) + SELECT id AS segmentId, id AS childSegmentId FROM segments - WHERE grandparentId IS NOT NULL UNION ALL SELECT parentId AS segmentId, id AS childSegmentId FROM segments - WHERE parentId IS NOT NULL AND grandparentId IS NULL + WHERE parentId IS NOT NULL UNION ALL - SELECT id AS segmentId, id AS childSegmentId + SELECT grandparentId AS segmentId, id AS childSegmentId FROM segments - WHERE parentId IS NULL AND grandparentId IS NULL + WHERE grandparentId IS NOT NULL NODE segmentMetricsFromActivities SQL > @@ -43,10 +49,10 @@ SQL > a.organizationId, org.createdAt >= now() - INTERVAL 30 DAY ) AS organizationsLast30Days FROM activityRelations_deduplicated_ds AS a - INNER JOIN segmentsRootMapping AS m ON a.segmentId = m.childSegmentId + INNER JOIN segmentsHierarchyMapping AS m ON a.segmentId = m.childSegmentId LEFT JOIN members AS mem FINAL ON a.memberId = mem.id LEFT JOIN organizations AS org FINAL ON a.organizationId = org.id - GROUP BY segmentId + GROUP BY m.segmentId NODE cdpDashboardMetricsPerSegment SQL > @@ -55,7 +61,7 @@ SQL > TYPE SINK EXPORT_SERVICE kafka EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming-staging -EXPORT_SCHEDULE 30 0 * * * +EXPORT_SCHEDULE 0 9 * * * EXPORT_FORMAT json EXPORT_STRATEGY @new EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_per_segment_sink From c26a0c252ff1b750d212de9a131507234e212a39 Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Fri, 12 Dec 2025 12:54:28 +0100 Subject: [PATCH 07/11] fix: add check on parent granparent --- .../pipes/cdp_dashboard_metrics_per_segments_sink.pipe | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/libs/tinybird/pipes/cdp_dashboard_metrics_per_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_dashboard_metrics_per_segments_sink.pipe index 2459b6470b..4eb5122a75 100644 --- a/services/libs/tinybird/pipes/cdp_dashboard_metrics_per_segments_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_dashboard_metrics_per_segments_sink.pipe @@ -27,11 +27,11 @@ SQL > UNION ALL SELECT parentId AS segmentId, id AS childSegmentId FROM segments - WHERE parentId IS NOT NULL + WHERE parentId IS NOT NULL AND parentId != '' UNION ALL SELECT grandparentId AS segmentId, id AS childSegmentId FROM segments - WHERE grandparentId IS NOT NULL + WHERE grandparentId IS NOT NULL AND grandparentId != '' NODE segmentMetricsFromActivities SQL > From 0c0d3df9a5220c7099d045e70a1c7fc5782a3020 Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Fri, 12 Dec 2025 14:50:04 +0100 Subject: [PATCH 08/11] fix: hierarky in tb --- ...p_dashboard_metrics_per_segments_sink.pipe | 41 +++++++++++-------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/services/libs/tinybird/pipes/cdp_dashboard_metrics_per_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_dashboard_metrics_per_segments_sink.pipe index 4eb5122a75..a7b82a611c 100644 --- a/services/libs/tinybird/pipes/cdp_dashboard_metrics_per_segments_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_dashboard_metrics_per_segments_sink.pipe @@ -13,25 +13,34 @@ DESCRIPTION > NODE segmentsHierarchyMapping SQL > - -- For every segment, map it to: - -- - itself - -- - its direct children - -- - its grandchildren (via grandparentId) - -- - -- Result: - -- - a root segment will have: (root, root), (root, child), (root, grandchild) - -- - a mid-level segment will: (mid, mid), (mid, grandchild) - -- - a leaf segment will only: (leaf, leaf) + -- For every segment, map it to its descendant subprojects only + -- Only subprojects (leaf segments) should have activities in activityRelations_deduplicated_ds + -- Project groups aggregate from their descendant subprojects + -- Projects aggregate from their direct child subprojects + -- Subprojects aggregate only their own activities SELECT id AS segmentId, id AS childSegmentId - FROM segments + FROM segments FINAL + WHERE (parentSlug IS NULL AND grandparentSlug IS NULL) + OR (parentSlug IS NOT NULL AND grandparentSlug IS NULL) + OR (parentSlug IS NOT NULL AND grandparentSlug IS NOT NULL) UNION ALL - SELECT parentId AS segmentId, id AS childSegmentId - FROM segments - WHERE parentId IS NOT NULL AND parentId != '' + -- Project groups include all their descendant subprojects + SELECT s_pg.id AS segmentId, s_sub.id AS childSegmentId + FROM (SELECT * FROM segments FINAL) AS s_pg + INNER JOIN (SELECT * FROM segments FINAL) AS s_sub ON s_sub.grandparentId = s_pg.id + WHERE s_pg.parentSlug IS NULL AND s_pg.grandparentSlug IS NULL + AND s_sub.parentSlug IS NOT NULL AND s_sub.grandparentSlug IS NOT NULL + AND s_pg.grandparentId IS NULL AND s_pg.parentId IS NULL + AND s_sub.grandparentId != '' AND s_sub.parentId != '' UNION ALL - SELECT grandparentId AS segmentId, id AS childSegmentId - FROM segments - WHERE grandparentId IS NOT NULL AND grandparentId != '' + -- Projects include their direct child subprojects + SELECT s_p.id AS segmentId, s_sub.id AS childSegmentId + FROM (SELECT * FROM segments FINAL) AS s_p + INNER JOIN (SELECT * FROM segments FINAL) AS s_sub ON s_sub.parentId = s_p.id + WHERE s_p.parentSlug IS NOT NULL AND s_p.grandparentSlug IS NULL + AND s_sub.parentSlug IS NOT NULL AND s_sub.grandparentSlug IS NOT NULL + AND s_p.grandparentId IS NULL AND s_p.parentId != '' + AND s_sub.grandparentId != '' AND s_sub.parentId != '' NODE segmentMetricsFromActivities SQL > From fa2a8d8a2771d6d20e613487446ff58aeeea98ec Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Fri, 12 Dec 2025 17:12:16 +0100 Subject: [PATCH 09/11] fix: segment pipe --- ...p_dashboard_metrics_per_segments_sink.pipe | 144 +++++++++++------- 1 file changed, 86 insertions(+), 58 deletions(-) diff --git a/services/libs/tinybird/pipes/cdp_dashboard_metrics_per_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_dashboard_metrics_per_segments_sink.pipe index a7b82a611c..03771a9cec 100644 --- a/services/libs/tinybird/pipes/cdp_dashboard_metrics_per_segments_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_dashboard_metrics_per_segments_sink.pipe @@ -1,71 +1,99 @@ DESCRIPTION > - CDP dashboard aggregated metrics by segment. - For each segmentId (root, parent or leaf) we aggregate metrics over - that segment and all its descendants in the hierarchy. - We compute: - - activitiesTotal / activitiesLast30Days (from activityRelations_deduplicated_ds) - - organizationsTotal / organizationsLast30Days (distinct orgs created in the last 30 days) - - membersTotal / membersLast30Days (distinct members created in the last 30 days) - Assumptions: - - Hierarchy is tree-like: a sub-project belongs to exactly one parent/grandparent. - - A member present in multiple sub-projects under the same segment subtree is counted ONCE per segmentId. - - "Last 30 days" for members/orgs means "created in the last 30 days". + CDP dashboard metrics per segment with hierarchy aggregation. + Clean approach assuming segments table is properly synced with complete hierarchy data. -NODE segmentsHierarchyMapping +NODE subprojectMetrics SQL > - -- For every segment, map it to its descendant subprojects only - -- Only subprojects (leaf segments) should have activities in activityRelations_deduplicated_ds - -- Project groups aggregate from their descendant subprojects - -- Projects aggregate from their direct child subprojects - -- Subprojects aggregate only their own activities - SELECT id AS segmentId, id AS childSegmentId + SELECT + segments.id AS segmentId, + 'subproject' AS segmentType, + segments.parentId, + segments.grandparentId, + segments.slug AS segmentSlug, + segments.parentSlug, + segments.grandparentSlug, + segments.name AS segmentName, + segments.parentName, + segments.grandparentName, + count() AS activitiesTotal, + countIf(ar.createdAt >= now() - INTERVAL 30 DAY) AS activitiesLast30Days, + countDistinct(ar.memberId) AS membersTotal, + countDistinctIf(ar.memberId, mem.joinedAt >= now() - INTERVAL 30 DAY) AS membersLast30Days, + countDistinct(ar.organizationId) AS organizationsTotal, + countDistinctIf(ar.organizationId, org.createdAt >= now() - INTERVAL 30 DAY) AS organizationsLast30Days FROM segments FINAL - WHERE (parentSlug IS NULL AND grandparentSlug IS NULL) - OR (parentSlug IS NOT NULL AND grandparentSlug IS NULL) - OR (parentSlug IS NOT NULL AND grandparentSlug IS NOT NULL) - UNION ALL - -- Project groups include all their descendant subprojects - SELECT s_pg.id AS segmentId, s_sub.id AS childSegmentId - FROM (SELECT * FROM segments FINAL) AS s_pg - INNER JOIN (SELECT * FROM segments FINAL) AS s_sub ON s_sub.grandparentId = s_pg.id - WHERE s_pg.parentSlug IS NULL AND s_pg.grandparentSlug IS NULL - AND s_sub.parentSlug IS NOT NULL AND s_sub.grandparentSlug IS NOT NULL - AND s_pg.grandparentId IS NULL AND s_pg.parentId IS NULL - AND s_sub.grandparentId != '' AND s_sub.parentId != '' - UNION ALL - -- Projects include their direct child subprojects - SELECT s_p.id AS segmentId, s_sub.id AS childSegmentId - FROM (SELECT * FROM segments FINAL) AS s_p - INNER JOIN (SELECT * FROM segments FINAL) AS s_sub ON s_sub.parentId = s_p.id - WHERE s_p.parentSlug IS NOT NULL AND s_p.grandparentSlug IS NULL - AND s_sub.parentSlug IS NOT NULL AND s_sub.grandparentSlug IS NOT NULL - AND s_p.grandparentId IS NULL AND s_p.parentId != '' - AND s_sub.grandparentId != '' AND s_sub.parentId != '' + LEFT JOIN activityRelations_deduplicated_ds AS ar ON segments.id = ar.segmentId + LEFT JOIN (SELECT * FROM members FINAL) AS mem ON ar.memberId = mem.id + LEFT JOIN (SELECT * FROM organizations FINAL) AS org ON ar.organizationId = org.id + WHERE segments.parentSlug IS NOT NULL AND segments.grandparentSlug IS NOT NULL + AND segments.parentId IS NOT NULL AND segments.grandparentId IS NOT NULL + AND segments.parentId != '' AND segments.grandparentId != '' + GROUP BY segments.id, segments.parentId, segments.grandparentId, segments.slug, segments.parentSlug, segments.grandparentSlug, + segments.name, segments.parentName, segments.grandparentName + +NODE projectMetrics +SQL > + SELECT + s.parentId AS segmentId, + 'project' AS segmentType, + s.parentId, + s.grandparentId, + s.parentSlug AS segmentSlug, + s.parentSlug, + s.grandparentSlug, + s.parentName AS segmentName, + s.parentName, + s.grandparentName, + count() AS activitiesTotal, + countIf(ar.createdAt >= now() - INTERVAL 30 DAY) AS activitiesLast30Days, + countDistinct(ar.memberId) AS membersTotal, + countDistinctIf(ar.memberId, mem.joinedAt >= now() - INTERVAL 30 DAY) AS membersLast30Days, + countDistinct(ar.organizationId) AS organizationsTotal, + countDistinctIf(ar.organizationId, org.createdAt >= now() - INTERVAL 30 DAY) AS organizationsLast30Days + FROM (SELECT * FROM segments FINAL) AS s + LEFT JOIN activityRelations_deduplicated_ds AS ar ON s.id = ar.segmentId + LEFT JOIN (SELECT * FROM members FINAL) AS mem ON ar.memberId = mem.id + LEFT JOIN (SELECT * FROM organizations FINAL) AS org ON ar.organizationId = org.id + WHERE s.parentSlug IS NOT NULL AND s.grandparentSlug IS NOT NULL + AND s.parentId IS NOT NULL AND s.grandparentId IS NOT NULL + AND s.parentId != '' AND s.grandparentId != '' + GROUP BY s.parentId, s.grandparentId, s.parentSlug, s.grandparentSlug, s.parentName, s.grandparentName -NODE segmentMetricsFromActivities +NODE projectGroupMetrics SQL > SELECT - m.segmentId AS segmentId, - -- activity metrics + s.grandparentId AS segmentId, + 'projectGroup' AS segmentType, + s.grandparentId AS parentId, + s.grandparentId, + s.grandparentSlug AS segmentSlug, + s.grandparentSlug AS parentSlug, + s.grandparentSlug AS grandparentSlug, + s.grandparentName AS segmentName, + s.grandparentName AS parentName, + s.grandparentName AS grandparentName, count() AS activitiesTotal, - countIf(a.createdAt >= now() - INTERVAL 30 DAY) AS activitiesLast30Days, - -- member metrics - countDistinct(a.memberId) AS membersTotal, - countDistinctIf(a.memberId, mem.joinedAt >= now() - INTERVAL 30 DAY) AS membersLast30Days, - -- organization metrics - countDistinct(a.organizationId) AS organizationsTotal, - countDistinctIf( - a.organizationId, org.createdAt >= now() - INTERVAL 30 DAY - ) AS organizationsLast30Days - FROM activityRelations_deduplicated_ds AS a - INNER JOIN segmentsHierarchyMapping AS m ON a.segmentId = m.childSegmentId - LEFT JOIN members AS mem FINAL ON a.memberId = mem.id - LEFT JOIN organizations AS org FINAL ON a.organizationId = org.id - GROUP BY m.segmentId + countIf(ar.createdAt >= now() - INTERVAL 30 DAY) AS activitiesLast30Days, + countDistinct(ar.memberId) AS membersTotal, + countDistinctIf(ar.memberId, mem.joinedAt >= now() - INTERVAL 30 DAY) AS membersLast30Days, + countDistinct(ar.organizationId) AS organizationsTotal, + countDistinctIf(ar.organizationId, org.createdAt >= now() - INTERVAL 30 DAY) AS organizationsLast30Days + FROM (SELECT * FROM segments FINAL) AS s + LEFT JOIN activityRelations_deduplicated_ds AS ar ON s.id = ar.segmentId + LEFT JOIN (SELECT * FROM members FINAL) AS mem ON ar.memberId = mem.id + LEFT JOIN (SELECT * FROM organizations FINAL) AS org ON ar.organizationId = org.id + WHERE s.parentSlug IS NOT NULL AND s.grandparentSlug IS NOT NULL + AND s.parentId IS NOT NULL AND s.grandparentId IS NOT NULL + AND s.parentId != '' AND s.grandparentId != '' + GROUP BY s.grandparentId, s.grandparentSlug, s.grandparentName NODE cdpDashboardMetricsPerSegment SQL > - SELECT * FROM segmentMetricsFromActivities + SELECT * FROM subprojectMetrics + UNION ALL + SELECT * FROM projectMetrics + UNION ALL + SELECT * FROM projectGroupMetrics TYPE SINK EXPORT_SERVICE kafka @@ -73,4 +101,4 @@ EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming-staging EXPORT_SCHEDULE 0 9 * * * EXPORT_FORMAT json EXPORT_STRATEGY @new -EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_per_segment_sink +EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_per_segment_sink \ No newline at end of file From 437daa68a18078dffed7e2dcde71f9d620be8a85 Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Fri, 19 Dec 2025 11:32:50 +0100 Subject: [PATCH 10/11] feat: update dockerfile image version --- scripts/scaffold/kafka-connect/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/scaffold/kafka-connect/Dockerfile b/scripts/scaffold/kafka-connect/Dockerfile index 1aacf70d27..c8b92c65f3 100644 --- a/scripts/scaffold/kafka-connect/Dockerfile +++ b/scripts/scaffold/kafka-connect/Dockerfile @@ -8,7 +8,7 @@ RUN confluent-hub install snowflakeinc/snowflake-kafka-connector:2.5.0 --no-prom RUN confluent-hub install confluentinc/kafka-connect-jdbc:10.7.4 --no-prompt COPY tmp/kafka-connect-http/ /usr/share/confluent-hub-components/kafka-connect-http/ -COPY tmp/custom-plugins/tinybird-smt-1.0.1.jar /usr/share/java/tinybird-smt-1.0.1.jar +COPY tmp/custom-plugins/tinybird-smt-1.0.4.jar /usr/share/java/tinybird-smt-1.0.4.jar VOLUME /storage From 411a62b9b4a8cd99bff8288a05f7021fab34d6c9 Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Tue, 30 Dec 2025 10:41:48 +0100 Subject: [PATCH 11/11] feat: update docker image --- scripts/scaffold/kafka-connect/Dockerfile | 2 +- ...p_dashboard_metrics_per_segments_sink.pipe | 104 ------------------ ...cdp_dashboard_metrics_subproject_sink.pipe | 96 ++++++++++++++++ .../cdp_dashboard_metrics_total_sink.pipe | 83 ++++++++------ 4 files changed, 145 insertions(+), 140 deletions(-) delete mode 100644 services/libs/tinybird/pipes/cdp_dashboard_metrics_per_segments_sink.pipe create mode 100644 services/libs/tinybird/pipes/cdp_dashboard_metrics_subproject_sink.pipe diff --git a/scripts/scaffold/kafka-connect/Dockerfile b/scripts/scaffold/kafka-connect/Dockerfile index c8b92c65f3..b77e0e779c 100644 --- a/scripts/scaffold/kafka-connect/Dockerfile +++ b/scripts/scaffold/kafka-connect/Dockerfile @@ -8,7 +8,7 @@ RUN confluent-hub install snowflakeinc/snowflake-kafka-connector:2.5.0 --no-prom RUN confluent-hub install confluentinc/kafka-connect-jdbc:10.7.4 --no-prompt COPY tmp/kafka-connect-http/ /usr/share/confluent-hub-components/kafka-connect-http/ -COPY tmp/custom-plugins/tinybird-smt-1.0.4.jar /usr/share/java/tinybird-smt-1.0.4.jar +COPY tmp/custom-plugins/tinybird-smt-1.0.8.jar /usr/share/java/tinybird-smt-1.0.8.jar VOLUME /storage diff --git a/services/libs/tinybird/pipes/cdp_dashboard_metrics_per_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_dashboard_metrics_per_segments_sink.pipe deleted file mode 100644 index 03771a9cec..0000000000 --- a/services/libs/tinybird/pipes/cdp_dashboard_metrics_per_segments_sink.pipe +++ /dev/null @@ -1,104 +0,0 @@ -DESCRIPTION > - CDP dashboard metrics per segment with hierarchy aggregation. - Clean approach assuming segments table is properly synced with complete hierarchy data. - -NODE subprojectMetrics -SQL > - SELECT - segments.id AS segmentId, - 'subproject' AS segmentType, - segments.parentId, - segments.grandparentId, - segments.slug AS segmentSlug, - segments.parentSlug, - segments.grandparentSlug, - segments.name AS segmentName, - segments.parentName, - segments.grandparentName, - count() AS activitiesTotal, - countIf(ar.createdAt >= now() - INTERVAL 30 DAY) AS activitiesLast30Days, - countDistinct(ar.memberId) AS membersTotal, - countDistinctIf(ar.memberId, mem.joinedAt >= now() - INTERVAL 30 DAY) AS membersLast30Days, - countDistinct(ar.organizationId) AS organizationsTotal, - countDistinctIf(ar.organizationId, org.createdAt >= now() - INTERVAL 30 DAY) AS organizationsLast30Days - FROM segments FINAL - LEFT JOIN activityRelations_deduplicated_ds AS ar ON segments.id = ar.segmentId - LEFT JOIN (SELECT * FROM members FINAL) AS mem ON ar.memberId = mem.id - LEFT JOIN (SELECT * FROM organizations FINAL) AS org ON ar.organizationId = org.id - WHERE segments.parentSlug IS NOT NULL AND segments.grandparentSlug IS NOT NULL - AND segments.parentId IS NOT NULL AND segments.grandparentId IS NOT NULL - AND segments.parentId != '' AND segments.grandparentId != '' - GROUP BY segments.id, segments.parentId, segments.grandparentId, segments.slug, segments.parentSlug, segments.grandparentSlug, - segments.name, segments.parentName, segments.grandparentName - -NODE projectMetrics -SQL > - SELECT - s.parentId AS segmentId, - 'project' AS segmentType, - s.parentId, - s.grandparentId, - s.parentSlug AS segmentSlug, - s.parentSlug, - s.grandparentSlug, - s.parentName AS segmentName, - s.parentName, - s.grandparentName, - count() AS activitiesTotal, - countIf(ar.createdAt >= now() - INTERVAL 30 DAY) AS activitiesLast30Days, - countDistinct(ar.memberId) AS membersTotal, - countDistinctIf(ar.memberId, mem.joinedAt >= now() - INTERVAL 30 DAY) AS membersLast30Days, - countDistinct(ar.organizationId) AS organizationsTotal, - countDistinctIf(ar.organizationId, org.createdAt >= now() - INTERVAL 30 DAY) AS organizationsLast30Days - FROM (SELECT * FROM segments FINAL) AS s - LEFT JOIN activityRelations_deduplicated_ds AS ar ON s.id = ar.segmentId - LEFT JOIN (SELECT * FROM members FINAL) AS mem ON ar.memberId = mem.id - LEFT JOIN (SELECT * FROM organizations FINAL) AS org ON ar.organizationId = org.id - WHERE s.parentSlug IS NOT NULL AND s.grandparentSlug IS NOT NULL - AND s.parentId IS NOT NULL AND s.grandparentId IS NOT NULL - AND s.parentId != '' AND s.grandparentId != '' - GROUP BY s.parentId, s.grandparentId, s.parentSlug, s.grandparentSlug, s.parentName, s.grandparentName - -NODE projectGroupMetrics -SQL > - SELECT - s.grandparentId AS segmentId, - 'projectGroup' AS segmentType, - s.grandparentId AS parentId, - s.grandparentId, - s.grandparentSlug AS segmentSlug, - s.grandparentSlug AS parentSlug, - s.grandparentSlug AS grandparentSlug, - s.grandparentName AS segmentName, - s.grandparentName AS parentName, - s.grandparentName AS grandparentName, - count() AS activitiesTotal, - countIf(ar.createdAt >= now() - INTERVAL 30 DAY) AS activitiesLast30Days, - countDistinct(ar.memberId) AS membersTotal, - countDistinctIf(ar.memberId, mem.joinedAt >= now() - INTERVAL 30 DAY) AS membersLast30Days, - countDistinct(ar.organizationId) AS organizationsTotal, - countDistinctIf(ar.organizationId, org.createdAt >= now() - INTERVAL 30 DAY) AS organizationsLast30Days - FROM (SELECT * FROM segments FINAL) AS s - LEFT JOIN activityRelations_deduplicated_ds AS ar ON s.id = ar.segmentId - LEFT JOIN (SELECT * FROM members FINAL) AS mem ON ar.memberId = mem.id - LEFT JOIN (SELECT * FROM organizations FINAL) AS org ON ar.organizationId = org.id - WHERE s.parentSlug IS NOT NULL AND s.grandparentSlug IS NOT NULL - AND s.parentId IS NOT NULL AND s.grandparentId IS NOT NULL - AND s.parentId != '' AND s.grandparentId != '' - GROUP BY s.grandparentId, s.grandparentSlug, s.grandparentName - -NODE cdpDashboardMetricsPerSegment -SQL > - SELECT * FROM subprojectMetrics - UNION ALL - SELECT * FROM projectMetrics - UNION ALL - SELECT * FROM projectGroupMetrics - -TYPE SINK -EXPORT_SERVICE kafka -EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming-staging -EXPORT_SCHEDULE 0 9 * * * -EXPORT_FORMAT json -EXPORT_STRATEGY @new -EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_per_segment_sink \ No newline at end of file diff --git a/services/libs/tinybird/pipes/cdp_dashboard_metrics_subproject_sink.pipe b/services/libs/tinybird/pipes/cdp_dashboard_metrics_subproject_sink.pipe new file mode 100644 index 0000000000..c997992aea --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_dashboard_metrics_subproject_sink.pipe @@ -0,0 +1,96 @@ +DESCRIPTION > + CDP dashboard metrics per segment - PROJECT level. + Uses cdp_member_segment_aggregates_MV and cdp_organization_segment_aggregates_MV. + "Last30Days" is based on lastActive >= now() - 30 days. + +NODE memberSegmentAgg +SQL > + -- Finalized member x segment aggregates + SELECT + segmentId, + memberId, + countMerge(activityCountState) AS activityCount, + maxMerge(lastActiveState) AS lastActive + FROM cdp_member_segment_aggregates_MV + GROUP BY + segmentId, + memberId + +NODE organizationSegmentAgg +SQL > + -- Finalized organization x segment aggregates + SELECT + segmentId, + organizationId, + countMerge(activityCountState) AS activityCount, + maxMerge(lastActiveState) AS lastActive + FROM cdp_organization_segment_aggregates_MV + GROUP BY + segmentId, + organizationId + +NODE projectMetrics +SQL > + SELECT + s.parentId AS segmentId, + 'project' AS segmentType, + s.parentId AS parentId, + s.grandparentId AS grandparentId, + s.parentSlug AS segmentSlug, + s.parentSlug AS parentSlug, + s.grandparentSlug AS grandparentSlug, + s.parentName AS segmentName, + s.parentName AS parentName, + s.grandparentName AS grandparentName, + + -- aggregate all subprojects under the same projectId + sum(ms.activityCount) AS activitiesTotal, + sumIf(ms.activityCount, ms.lastActive >= now() - INTERVAL 30 DAY) AS activitiesLast30Days, + + uniqCombined(ms.memberId) AS membersTotal, + uniqCombinedIf(ms.memberId, ms.lastActive >= now() - INTERVAL 30 DAY) AS membersLast30Days, + + uniqCombined(os.organizationId) AS organizationsTotal, + uniqCombinedIf(os.organizationId, os.lastActive >= now() - INTERVAL 30 DAY) + AS organizationsLast30Days + FROM + ( + -- segment hierarchy filter: only fully-populated segments + SELECT + id, + parentId, + grandparentId, + slug, + parentSlug, + grandparentSlug, + name, + parentName, + grandparentName + FROM segments + WHERE + parentSlug IS NOT NULL + AND grandparentSlug IS NOT NULL + AND parentId IS NOT NULL + AND grandparentId IS NOT NULL + AND parentId != '' + AND grandparentId != '' + ) AS s + LEFT JOIN memberSegmentAgg AS ms + ON ms.segmentId = s.id + LEFT JOIN organizationSegmentAgg AS os + ON os.segmentId = s.id + GROUP BY + s.parentId, + s.grandparentId, + s.parentSlug, + s.grandparentSlug, + s.parentName, + s.grandparentName + +TYPE SINK +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_SCHEDULE 5 9 * * * +EXPORT_FORMAT json +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_per_segment_sink diff --git a/services/libs/tinybird/pipes/cdp_dashboard_metrics_total_sink.pipe b/services/libs/tinybird/pipes/cdp_dashboard_metrics_total_sink.pipe index c38a151efd..f614314e38 100644 --- a/services/libs/tinybird/pipes/cdp_dashboard_metrics_total_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_dashboard_metrics_total_sink.pipe @@ -1,45 +1,58 @@ DESCRIPTION > - Global metrics for activities, organizations, and members used for CDP dashboard. We. are referring to the total witouth filtering by any segment + Global metrics for activities, organizations, and members used for CDP dashboard. + Uses cdp_member_segment_aggregates_MV and cdp_organization_segment_aggregates_MV. + "Last30Days" is based on lastActive >= now() - 30 days. -NODE activityRelationsMetricsTotal +NODE cdpDashboardMetricsTotal SQL > SELECT - count() AS activitiesTotal, - countIf(createdAt >= now() - INTERVAL 30 DAY) AS activitiesLast30Days - FROM activityRelations_deduplicated_ds - -NODE organizationsMetricsTotal -SQL > - SELECT - count() AS organizationsTotal, - countIf(createdAt >= now() - INTERVAL 30 DAY) AS organizationsLast30Days - FROM organizations FINAL - -NODE membersMetricsTotal -SQL > - SELECT count() AS membersTotal, countIf(joinedAt >= now() - INTERVAL 30 DAY) AS membersLast30Days - FROM members FINAL - -NODE mergeResults -SQL > - SELECT - -- activity - (SELECT activitiesTotal FROM activityRelationsMetricsTotal) AS activitiesTotal, - (SELECT activitiesLast30Days FROM activityRelationsMetricsTotal) AS activitiesLast30Days, - -- organizations - (SELECT organizationsTotal FROM organizationsMetricsTotal) AS organizationsTotal, - (SELECT organizationsLast30Days FROM organizationsMetricsTotal) AS organizationsLast30Days, - -- members - (SELECT membersTotal FROM membersMetricsTotal) AS membersTotal, - (SELECT membersLast30Days FROM membersMetricsTotal) AS membersLast30Days - -NODE cdpDashboardFullMetricsTotal -SQL > - SELECT * FROM mergeResults + ms.activitiesTotal, + ms.activitiesLast30Days, + ms.membersTotal, + ms.membersLast30Days, + os.organizationsTotal, + os.organizationsLast30Days + FROM + ( + -- member-based global metrics (single scan over cdp_member_segment_aggregates_MV) + SELECT + sum(activityCount) AS activitiesTotal, + sumIf(activityCount, lastActive >= now() - INTERVAL 30 DAY) AS activitiesLast30Days, + uniqCombined(memberId) AS membersTotal, + uniqCombinedIf(memberId, lastActive >= now() - INTERVAL 30 DAY) AS membersLast30Days + FROM + ( + -- finalize AggregateFunction states per member + SELECT + memberId, + countMerge(activityCountState) AS activityCount, + maxMerge(lastActiveState) AS lastActive + FROM cdp_member_segment_aggregates_MV + GROUP BY + memberId + ) AS m + ) AS ms + CROSS JOIN + ( + -- organization-based global metrics (single scan over cdp_organization_segment_aggregates_MV) + SELECT + uniqCombined(organizationId) AS organizationsTotal, + uniqCombinedIf(organizationId, lastActive >= now() - INTERVAL 30 DAY) AS organizationsLast30Days + FROM + ( + -- finalize AggregateFunction states per organization + SELECT + organizationId, + maxMerge(lastActiveState) AS lastActive + FROM cdp_organization_segment_aggregates_MV + GROUP BY + organizationId + ) AS o + ) AS os TYPE SINK EXPORT_SERVICE kafka -EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming-staging +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming EXPORT_SCHEDULE 0 9 * * * EXPORT_FORMAT json EXPORT_STRATEGY @new