Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
DESCRIPTION >
Segment-level aggregate states (one row per segmentId per snapshot).
Stores AggregateFunction states for DISTINCT rollups (uniqCombinedState).

SCHEMA >
`snapshotDate` Date,
`segmentId` String,
`parentId` String,
`grandparentId` String,
`activitiesTotalState` AggregateFunction(count),
`activitiesLast30DaysState` AggregateFunction(count),
`membersUniqState` AggregateFunction(uniqCombined, String),
`membersLast30UniqState` AggregateFunction(uniqCombined, String),
`orgsUniqState` AggregateFunction(uniqCombined, String),
`orgsLast30UniqState` AggregateFunction(uniqCombined, String)

ENGINE AggregatingMergeTree
ENGINE_PARTITION_KEY snapshotDate
ENGINE_SORTING_KEY (snapshotDate, segmentId)
109 changes: 109 additions & 0 deletions services/libs/tinybird/pipes/cdp_segment_metrics_copy_pipe.pipe
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
DESCRIPTION >
Daily batch job that builds segment-level aggregate states from existing
member and organization segment aggregate datasources.
Activities computed from activityRelations_enriched_deduplicated_ds (latest snapshotId).
Optimized: compute only for "valid segments" early, compute latest snapshot once,
and reuse empty states as constants.

NODE validSegments
SQL >
SELECT id, parentId, grandparentId
FROM segments
WHERE
parentSlug IS NOT NULL
AND grandparentSlug IS NOT NULL
AND parentId IS NOT NULL
AND grandparentId IS NOT NULL
AND parentId != ''
AND grandparentId != ''

NODE activitiesStates
SQL >
-- Compute total + last-30-days activities in ONE scan, restricted to valid segments.
SELECT
ar.segmentId,
countState(CAST(1 AS UInt8)) AS activitiesTotalState,
countState(
if(ar.timestamp >= now() - INTERVAL 30 DAY, CAST(1 AS UInt8), NULL)
) AS activitiesLast30DaysState
FROM activityRelations_enriched_deduplicated_ds AS ar
WHERE
snapshotId = (
SELECT snapshotId
FROM activityRelations_enriched_deduplicated_ds
ORDER BY snapshotId DESC
LIMIT 1
)
AND segmentId IS NOT NULL
AND segmentId != ''
GROUP BY ar.segmentId

NODE memberPerSegment
SQL >
-- Finalize per (segmentId, memberId) only for valid segments
SELECT m.segmentId, m.memberId, maxMerge(m.lastActiveState) AS lastActive
FROM cdp_member_segment_aggregates_ds AS m ANY
INNER JOIN validSegments AS vs ON vs.id = m.segmentId
GROUP BY m.segmentId, m.memberId

NODE memberAllTimeStates
SQL >
SELECT segmentId, uniqCombinedState(memberId) AS membersUniqState
FROM memberPerSegment
GROUP BY segmentId

NODE memberLast30States
SQL >
SELECT segmentId, uniqCombinedState(memberId) AS membersLast30UniqState
FROM memberPerSegment
WHERE lastActive >= now() - INTERVAL 30 DAY
GROUP BY segmentId

NODE orgPerSegment
SQL >
-- Finalize per (segmentId, organizationId) only for valid segments
SELECT o.segmentId, o.organizationId, maxMerge(o.lastActiveState) AS lastActive
FROM cdp_organization_segment_aggregates_ds AS o ANY
INNER JOIN validSegments AS vs ON vs.id = o.segmentId
GROUP BY o.segmentId, o.organizationId

NODE orgAllTimeStates
SQL >
SELECT segmentId, uniqCombinedState(organizationId) AS orgsUniqState
FROM orgPerSegment
GROUP BY segmentId

NODE orgLast30States
SQL >
SELECT segmentId, uniqCombinedState(organizationId) AS orgsLast30UniqState
FROM orgPerSegment
WHERE lastActive >= now() - INTERVAL 30 DAY
GROUP BY segmentId

NODE segmentAggStates
SQL >
-- Attach hierarchy + snapshot date; reuse empty states as constants
WITH
(SELECT countState(CAST(1 AS UInt8)) FROM system.one WHERE 0) AS emptyCountU8State,
(SELECT uniqCombinedState(CAST('' AS String)) FROM system.one WHERE 0) AS emptyUniqStringState
SELECT
vs.id AS segmentId,
vs.parentId AS parentId,
vs.grandparentId AS grandparentId,
ifNull(a.activitiesTotalState, emptyCountU8State) AS activitiesTotalState,
ifNull(a.activitiesLast30DaysState, emptyCountU8State) AS activitiesLast30DaysState,
ifNull(m.membersUniqState, emptyUniqStringState) AS membersUniqState,
ifNull(ml30.membersLast30UniqState, emptyUniqStringState) AS membersLast30UniqState,
ifNull(o.orgsUniqState, emptyUniqStringState) AS orgsUniqState,
ifNull(ol30.orgsLast30UniqState, emptyUniqStringState) AS orgsLast30UniqState
FROM validSegments AS vs
LEFT JOIN activitiesStates AS a ON a.segmentId = vs.id
LEFT JOIN memberAllTimeStates AS m ON m.segmentId = vs.id
LEFT JOIN memberLast30States AS ml30 ON ml30.segmentId = vs.id
LEFT JOIN orgAllTimeStates AS o ON o.segmentId = vs.id
LEFT JOIN orgLast30States AS ol30 ON ol30.segmentId = vs.id

TYPE COPY
TARGET_DATASOURCE cdp_segment_metrics_ds
COPY_MODE replace
COPY_SCHEDULE 0 9 * * *
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
DESCRIPTION >
CDP dashboard metrics per segment - PROJECT GROUP level.
Rolls up subproject aggregate states via grandparentId.

NODE projectGroupMetrics
SQL >
SELECT
s.grandparentId AS segmentId,
'projectGroup' AS segmentType,
any (s.grandparentId) AS parentId,
any (s.grandparentId) AS grandparentId,
any (s.grandparentSlug) AS segmentSlug,
any (s.grandparentSlug) AS parentSlug,
any (s.grandparentSlug) AS grandparentSlug,
max(s.grandparentName) AS segmentName,
max(s.grandparentName) AS parentName,
max(s.grandparentName) AS grandparentName,
countMerge(sa.activitiesTotalState) AS activitiesTotal,
countMerge(sa.activitiesLast30DaysState) AS activitiesLast30Days,
uniqCombinedMerge(sa.membersUniqState) AS membersTotal,
uniqCombinedMerge(sa.membersLast30UniqState) AS membersLast30Days,
uniqCombinedMerge(sa.orgsUniqState) AS organizationsTotal,
uniqCombinedMerge(sa.orgsLast30UniqState) AS organizationsLast30Days
FROM segments AS s
LEFT JOIN
cdp_segment_metrics_ds AS sa
ON sa.segmentId = s.id
AND sa.snapshotDate = (SELECT max(snapshotDate) FROM cdp_segment_metrics_ds)
WHERE
s.parentSlug IS NOT NULL
AND s.grandparentSlug IS NOT NULL
AND s.parentId IS NOT NULL
AND s.grandparentId IS NOT NULL
AND s.parentId != ''
AND s.grandparentId != ''
GROUP BY s.grandparentId

TYPE SINK
EXPORT_SERVICE kafka
EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming
EXPORT_SCHEDULE 35 9 * * *
EXPORT_FORMAT json
EXPORT_STRATEGY @new
EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_per_segment_sink
45 changes: 45 additions & 0 deletions services/libs/tinybird/pipes/cdp_segment_metrics_project_sink.pipe
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
DESCRIPTION >
CDP dashboard metrics per segment - PROJECT level.
Rolls up subproject aggregate states via parentId.

NODE projectMetrics
SQL >
SELECT
s.parentId AS segmentId,
'project' AS segmentType,
any (s.grandparentId) AS parentId,
any (s.grandparentId) AS grandparentId,
coalesce(nullIf(anyHeavy(s.parentSlug), ''), any (s.parentSlug)) AS segmentSlug,
coalesce(nullIf(anyHeavy(s.grandparentSlug), ''), any (s.grandparentSlug)) AS parentSlug,
coalesce(nullIf(anyHeavy(s.grandparentSlug), ''), any (s.grandparentSlug)) AS grandparentSlug,
-- pick a non-empty name if available
coalesce(nullIf(anyHeavy(s.parentName), ''), any (s.parentName)) AS segmentName,
coalesce(nullIf(anyHeavy(s.grandparentName), ''), any (s.grandparentName)) AS parentName,
coalesce(nullIf(anyHeavy(s.grandparentName), ''), any (s.grandparentName)) AS grandparentName,
countMerge(sa.activitiesTotalState) AS activitiesTotal,
countMerge(sa.activitiesLast30DaysState) AS activitiesLast30Days,
uniqCombinedMerge(sa.membersUniqState) AS membersTotal,
uniqCombinedMerge(sa.membersLast30UniqState) AS membersLast30Days,
uniqCombinedMerge(sa.orgsUniqState) AS organizationsTotal,
uniqCombinedMerge(sa.orgsLast30UniqState) AS organizationsLast30Days
FROM segments AS s
LEFT JOIN
cdp_segment_metrics_ds AS sa
ON sa.segmentId = s.id
AND sa.snapshotDate = (SELECT max(snapshotDate) FROM cdp_segment_metrics_ds)
WHERE
s.parentSlug IS NOT NULL
AND s.grandparentSlug IS NOT NULL
AND s.parentId IS NOT NULL
AND s.grandparentId IS NOT NULL
AND s.parentId != ''
AND s.grandparentId != ''
GROUP BY s.parentId

TYPE SINK
EXPORT_SERVICE kafka
EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming
EXPORT_SCHEDULE 40 9 * * *
EXPORT_FORMAT json
EXPORT_STRATEGY @new
EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_per_segment_sink
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
DESCRIPTION >
CDP dashboard metrics per segment - SUBPROJECT level.
Finalizes segment-level aggregate states and exports them.

NODE subprojectMetrics
SQL >
SELECT
s.id AS segmentId,
'subproject' AS segmentType,
any (s.parentId) AS parentId,
any (s.grandparentId) AS grandparentId,
any (s.slug) AS segmentSlug,
any (s.parentSlug) AS parentSlug,
any (s.grandparentSlug) AS grandparentSlug,
max(s.name) AS segmentName,
max(s.parentName) AS parentName,
max(s.grandparentName) AS grandparentName,
countMerge(sa.activitiesTotalState) AS activitiesTotal,
countMerge(sa.activitiesLast30DaysState) AS activitiesLast30Days,
uniqCombinedMerge(sa.membersUniqState) AS membersTotal,
uniqCombinedMerge(sa.membersLast30UniqState) AS membersLast30Days,
uniqCombinedMerge(sa.orgsUniqState) AS organizationsTotal,
uniqCombinedMerge(sa.orgsLast30UniqState) AS organizationsLast30Days
FROM segments s
LEFT JOIN
cdp_segment_metrics_ds sa
ON sa.segmentId = s.id
AND sa.snapshotDate = (SELECT max(snapshotDate) FROM cdp_segment_metrics_ds)
WHERE
s.parentSlug IS NOT NULL
AND s.grandparentSlug IS NOT NULL
AND s.parentId IS NOT NULL
AND s.grandparentId IS NOT NULL
AND s.parentId != ''
AND s.grandparentId != ''
GROUP BY s.id

TYPE SINK
EXPORT_SERVICE kafka
EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming
EXPORT_SCHEDULE 30 9 * * *
EXPORT_FORMAT json
EXPORT_STRATEGY @new
EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_per_segment_sink