diff --git a/scripts/scaffold/kafka-connect/Dockerfile b/scripts/scaffold/kafka-connect/Dockerfile index 015bf97f41..b77e0e779c 100644 --- a/scripts/scaffold/kafka-connect/Dockerfile +++ b/scripts/scaffold/kafka-connect/Dockerfile @@ -5,7 +5,10 @@ USER root RUN yum install -y jq findutils unzip RUN confluent-hub install snowflakeinc/snowflake-kafka-connector:2.5.0 --no-prompt +RUN confluent-hub install confluentinc/kafka-connect-jdbc:10.7.4 --no-prompt + COPY tmp/kafka-connect-http/ /usr/share/confluent-hub-components/kafka-connect-http/ +COPY tmp/custom-plugins/tinybird-smt-1.0.8.jar /usr/share/java/tinybird-smt-1.0.8.jar VOLUME /storage diff --git a/services/libs/tinybird/pipes/cdp_dashboard_metrics_subproject_sink.pipe b/services/libs/tinybird/pipes/cdp_dashboard_metrics_subproject_sink.pipe new file mode 100644 index 0000000000..c997992aea --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_dashboard_metrics_subproject_sink.pipe @@ -0,0 +1,96 @@ +DESCRIPTION > + CDP dashboard metrics per segment - PROJECT level. + Uses cdp_member_segment_aggregates_MV and cdp_organization_segment_aggregates_MV. + "Last30Days" is based on lastActive >= now() - 30 days. + +NODE memberSegmentAgg +SQL > + -- Finalized member x segment aggregates + SELECT + segmentId, + memberId, + countMerge(activityCountState) AS activityCount, + maxMerge(lastActiveState) AS lastActive + FROM cdp_member_segment_aggregates_MV + GROUP BY + segmentId, + memberId + +NODE organizationSegmentAgg +SQL > + -- Finalized organization x segment aggregates + SELECT + segmentId, + organizationId, + countMerge(activityCountState) AS activityCount, + maxMerge(lastActiveState) AS lastActive + FROM cdp_organization_segment_aggregates_MV + GROUP BY + segmentId, + organizationId + +NODE projectMetrics +SQL > + SELECT + s.parentId AS segmentId, + 'project' AS segmentType, + s.parentId AS parentId, + s.grandparentId AS grandparentId, + s.parentSlug AS segmentSlug, + s.parentSlug AS parentSlug, + s.grandparentSlug AS grandparentSlug, + s.parentName AS segmentName, + s.parentName AS parentName, + s.grandparentName AS grandparentName, + + -- aggregate all subprojects under the same projectId + sum(ms.activityCount) AS activitiesTotal, + sumIf(ms.activityCount, ms.lastActive >= now() - INTERVAL 30 DAY) AS activitiesLast30Days, + + uniqCombined(ms.memberId) AS membersTotal, + uniqCombinedIf(ms.memberId, ms.lastActive >= now() - INTERVAL 30 DAY) AS membersLast30Days, + + uniqCombined(os.organizationId) AS organizationsTotal, + uniqCombinedIf(os.organizationId, os.lastActive >= now() - INTERVAL 30 DAY) + AS organizationsLast30Days + FROM + ( + -- segment hierarchy filter: only fully-populated segments + SELECT + id, + parentId, + grandparentId, + slug, + parentSlug, + grandparentSlug, + name, + parentName, + grandparentName + FROM segments + WHERE + parentSlug IS NOT NULL + AND grandparentSlug IS NOT NULL + AND parentId IS NOT NULL + AND grandparentId IS NOT NULL + AND parentId != '' + AND grandparentId != '' + ) AS s + LEFT JOIN memberSegmentAgg AS ms + ON ms.segmentId = s.id + LEFT JOIN organizationSegmentAgg AS os + ON os.segmentId = s.id + GROUP BY + s.parentId, + s.grandparentId, + s.parentSlug, + s.grandparentSlug, + s.parentName, + s.grandparentName + +TYPE SINK +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_SCHEDULE 5 9 * * * +EXPORT_FORMAT json +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_per_segment_sink diff --git a/services/libs/tinybird/pipes/cdp_dashboard_metrics_total_sink.pipe b/services/libs/tinybird/pipes/cdp_dashboard_metrics_total_sink.pipe new file mode 100644 index 0000000000..f614314e38 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_dashboard_metrics_total_sink.pipe @@ -0,0 +1,59 @@ +DESCRIPTION > + Global metrics for activities, organizations, and members used for CDP dashboard. + Uses cdp_member_segment_aggregates_MV and cdp_organization_segment_aggregates_MV. + "Last30Days" is based on lastActive >= now() - 30 days. + +NODE cdpDashboardMetricsTotal +SQL > + SELECT + ms.activitiesTotal, + ms.activitiesLast30Days, + ms.membersTotal, + ms.membersLast30Days, + os.organizationsTotal, + os.organizationsLast30Days + FROM + ( + -- member-based global metrics (single scan over cdp_member_segment_aggregates_MV) + SELECT + sum(activityCount) AS activitiesTotal, + sumIf(activityCount, lastActive >= now() - INTERVAL 30 DAY) AS activitiesLast30Days, + uniqCombined(memberId) AS membersTotal, + uniqCombinedIf(memberId, lastActive >= now() - INTERVAL 30 DAY) AS membersLast30Days + FROM + ( + -- finalize AggregateFunction states per member + SELECT + memberId, + countMerge(activityCountState) AS activityCount, + maxMerge(lastActiveState) AS lastActive + FROM cdp_member_segment_aggregates_MV + GROUP BY + memberId + ) AS m + ) AS ms + CROSS JOIN + ( + -- organization-based global metrics (single scan over cdp_organization_segment_aggregates_MV) + SELECT + uniqCombined(organizationId) AS organizationsTotal, + uniqCombinedIf(organizationId, lastActive >= now() - INTERVAL 30 DAY) AS organizationsLast30Days + FROM + ( + -- finalize AggregateFunction states per organization + SELECT + organizationId, + maxMerge(lastActiveState) AS lastActive + FROM cdp_organization_segment_aggregates_MV + GROUP BY + organizationId + ) AS o + ) AS os + +TYPE SINK +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_SCHEDULE 0 9 * * * +EXPORT_FORMAT json +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_total_sink