Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions scripts/scaffold/kafka-connect/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ USER root
RUN yum install -y jq findutils unzip

RUN confluent-hub install snowflakeinc/snowflake-kafka-connector:2.5.0 --no-prompt
RUN confluent-hub install confluentinc/kafka-connect-jdbc:10.7.4 --no-prompt

COPY tmp/kafka-connect-http/ /usr/share/confluent-hub-components/kafka-connect-http/
COPY tmp/custom-plugins/tinybird-smt-1.0.8.jar /usr/share/java/tinybird-smt-1.0.8.jar


VOLUME /storage
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
DESCRIPTION >
CDP dashboard metrics per segment - PROJECT level.
Uses cdp_member_segment_aggregates_MV and cdp_organization_segment_aggregates_MV.
"Last30Days" is based on lastActive >= now() - 30 days.

NODE memberSegmentAgg
SQL >
-- Finalized member x segment aggregates
SELECT
segmentId,
memberId,
countMerge(activityCountState) AS activityCount,
maxMerge(lastActiveState) AS lastActive
FROM cdp_member_segment_aggregates_MV
GROUP BY
segmentId,
memberId

NODE organizationSegmentAgg
SQL >
-- Finalized organization x segment aggregates
SELECT
segmentId,
organizationId,
countMerge(activityCountState) AS activityCount,
maxMerge(lastActiveState) AS lastActive
FROM cdp_organization_segment_aggregates_MV
GROUP BY
segmentId,
organizationId

NODE projectMetrics
SQL >
SELECT
s.parentId AS segmentId,
'project' AS segmentType,
s.parentId AS parentId,
s.grandparentId AS grandparentId,
s.parentSlug AS segmentSlug,
s.parentSlug AS parentSlug,
s.grandparentSlug AS grandparentSlug,
s.parentName AS segmentName,
s.parentName AS parentName,
s.grandparentName AS grandparentName,

-- aggregate all subprojects under the same projectId
sum(ms.activityCount) AS activitiesTotal,
sumIf(ms.activityCount, ms.lastActive >= now() - INTERVAL 30 DAY) AS activitiesLast30Days,

uniqCombined(ms.memberId) AS membersTotal,
uniqCombinedIf(ms.memberId, ms.lastActive >= now() - INTERVAL 30 DAY) AS membersLast30Days,

uniqCombined(os.organizationId) AS organizationsTotal,
uniqCombinedIf(os.organizationId, os.lastActive >= now() - INTERVAL 30 DAY)
AS organizationsLast30Days
FROM
(
-- segment hierarchy filter: only fully-populated segments
SELECT
id,
parentId,
grandparentId,
slug,
parentSlug,
grandparentSlug,
name,
parentName,
grandparentName
FROM segments
WHERE
parentSlug IS NOT NULL
AND grandparentSlug IS NOT NULL
AND parentId IS NOT NULL
AND grandparentId IS NOT NULL
AND parentId != ''
AND grandparentId != ''
) AS s
LEFT JOIN memberSegmentAgg AS ms
ON ms.segmentId = s.id
LEFT JOIN organizationSegmentAgg AS os
ON os.segmentId = s.id
GROUP BY
s.parentId,
s.grandparentId,
s.parentSlug,
s.grandparentSlug,
s.parentName,
s.grandparentName

TYPE SINK
EXPORT_SERVICE kafka
EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming
EXPORT_SCHEDULE 5 9 * * *
EXPORT_FORMAT json
EXPORT_STRATEGY @new
EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_per_segment_sink
59 changes: 59 additions & 0 deletions services/libs/tinybird/pipes/cdp_dashboard_metrics_total_sink.pipe
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
DESCRIPTION >
Global metrics for activities, organizations, and members used for CDP dashboard.
Uses cdp_member_segment_aggregates_MV and cdp_organization_segment_aggregates_MV.
"Last30Days" is based on lastActive >= now() - 30 days.

NODE cdpDashboardMetricsTotal
SQL >
SELECT
ms.activitiesTotal,
ms.activitiesLast30Days,
ms.membersTotal,
ms.membersLast30Days,
os.organizationsTotal,
os.organizationsLast30Days
FROM
(
-- member-based global metrics (single scan over cdp_member_segment_aggregates_MV)
SELECT
sum(activityCount) AS activitiesTotal,
sumIf(activityCount, lastActive >= now() - INTERVAL 30 DAY) AS activitiesLast30Days,
uniqCombined(memberId) AS membersTotal,
uniqCombinedIf(memberId, lastActive >= now() - INTERVAL 30 DAY) AS membersLast30Days
FROM
(
-- finalize AggregateFunction states per member
SELECT
memberId,
countMerge(activityCountState) AS activityCount,
maxMerge(lastActiveState) AS lastActive
FROM cdp_member_segment_aggregates_MV
GROUP BY
memberId
) AS m
) AS ms
CROSS JOIN
(
-- organization-based global metrics (single scan over cdp_organization_segment_aggregates_MV)
SELECT
uniqCombined(organizationId) AS organizationsTotal,
uniqCombinedIf(organizationId, lastActive >= now() - INTERVAL 30 DAY) AS organizationsLast30Days
FROM
(
-- finalize AggregateFunction states per organization
SELECT
organizationId,
maxMerge(lastActiveState) AS lastActive
FROM cdp_organization_segment_aggregates_MV
GROUP BY
organizationId
) AS o
) AS os

TYPE SINK
EXPORT_SERVICE kafka
EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming
EXPORT_SCHEDULE 0 9 * * *
EXPORT_FORMAT json
EXPORT_STRATEGY @new
EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_total_sink
Loading