Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
SCHEMA >
`segmentId` String,
`memberId` String,
`tenantId` String,
`activityCountState` AggregateFunction(count, String),
`lastActiveState` AggregateFunction(max, DateTime64(3)),
`activityTypesState` AggregateFunction(groupArrayDistinct, String),
`activeOnState` AggregateFunction(groupArrayDistinct, String),
`averageSentimentState` AggregateFunction(avg, Int8),
`lastActivityUpdatedAtState` AggregateFunction(max, DateTime64(3)),
`updatedAt` DateTime64(3)

ENGINE AggregatingMergeTree
ENGINE_PARTITION_KEY toYear(updatedAt)
ENGINE_SORTING_KEY segmentId, memberId
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
SCHEMA >
`segmentId` String,
`organizationId` String,
`tenantId` String,
`joinedAtState` AggregateFunction(min, DateTime64(3)),
`lastActiveState` AggregateFunction(max, DateTime64(3)),
`activeOnState` AggregateFunction(groupArrayDistinct, String),
`activityCountState` AggregateFunction(count, String),
`memberCountState` AggregateFunction(countDistinct, String),
`avgContributorEngagement` AggregateFunction(avg, Int8),
`lastActivityUpdatedAtState` AggregateFunction(max, DateTime64(3)),
`updatedAt` DateTime64(3)

ENGINE AggregatingMergeTree
ENGINE_PARTITION_KEY toYear(updatedAt)
ENGINE_SORTING_KEY segmentId, organizationId
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
NODE leaf_segment_aggregates
SQL >
%
SELECT
segmentId,
memberId,
'875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId,
countMerge(activityCountState) AS activityCount,
maxMerge(lastActiveState) AS lastActive,
groupArrayDistinctMerge(activityTypesState) AS activityTypes,
groupArrayDistinctMerge(activeOnState) AS activeOn,
avgMerge(averageSentimentState) AS averageSentiment,
maxMerge(updatedAtState) AS updatedAt
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: References non-existent column updatedAtState in queries

The query references maxMerge(updatedAtState) but this column doesn't exist in cdp_member_segment_aggregates_ds. The datasource schema defines lastActivityUpdatedAtState (an aggregate function) and updatedAt (a regular timestamp), not updatedAtState. This will cause the query to fail at runtime. The same issue occurs in all three nodes: leaf_segment_aggregates, parent_segment_aggregates, and grandparent_segment_aggregates.

Additional Locations (2)

Fix in Cursor Fix in Web

FROM cdp_member_segment_aggregates_ds
{% if defined(bucket_id) %}
WHERE
cityHash64(segmentId) % 10
= {{
UInt8(
bucket_id,
0,
description="This is bucket id of the activity segment",
required=False,
)
}}
{% end %}
GROUP BY segmentId, memberId

NODE parent_segment_aggregates
SQL >
%
SELECT
parentId as segmentId,
memberId,
'875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId,
countMerge(activityCountState) AS activityCount,
maxMerge(lastActiveState) AS lastActive,
groupArrayDistinctMerge(activityTypesState) AS activityTypes,
groupArrayDistinctMerge(activeOnState) AS activeOn,
avgMerge(averageSentimentState) AS averageSentiment,
maxMerge(updatedAtState) AS updatedAt
FROM cdp_member_segment_aggregates_ds as cdp_aggs
join segments s on s.id = cdp_aggs.segmentId
{% if defined(bucket_id) %}
WHERE
cityHash64(segmentId) % 10
= {{
UInt8(
bucket_id,
0,
description="This is bucket id of the activity segment",
required=False,
)
}}
{% end %}
GROUP BY parentId, memberId

NODE grandparent_segment_aggregates
SQL >
%
SELECT
grandparentId as segmentId,
memberId,
'875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId,
countMerge(activityCountState) AS activityCount,
maxMerge(lastActiveState) AS lastActive,
groupArrayDistinctMerge(activityTypesState) AS activityTypes,
groupArrayDistinctMerge(activeOnState) AS activeOn,
avgMerge(averageSentimentState) AS averageSentiment,
maxMerge(updatedAtState) AS updatedAt
FROM cdp_member_segment_aggregates_ds as cdp_aggs
join segments s on s.id = cdp_aggs.segmentId
{% if defined(bucket_id) %}
WHERE
cityHash64(grandparentId) % 10
= {{
UInt8(
bucket_id,
0,
description="This is bucket id of the activity segment",
required=False,
)
}}
{% end %}
GROUP BY grandparentId, memberId

NODE cdp_member_segment_aggs_union
SQL >
select *
from leaf_segment_aggregates
union all
select *
from parent_segment_aggregates
union all
select *
from grandparent_segment_aggregates

TYPE SINK
EXPORT_SERVICE kafka
EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming
EXPORT_SCHEDULE @on-demand
EXPORT_FORMAT csv
EXPORT_STRATEGY @new
EXPORT_KAFKA_TOPIC memberSegmentsAggs_sink
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Kafka topic name typo sends data to wrong topic

The backfiller sink exports to memberSegmentsAggs_sink (with extra 's') while all other member aggregate sinks export to memberSegmentsAgg_sink. This inconsistency will cause backfill data to be sent to a different Kafka topic than the incremental updates, potentially causing data to be lost or processed incorrectly by downstream consumers.

Fix in Cursor Fix in Web

Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
NODE members_with_changed_aggs_previous_day
SQL >
select distinct memberId
from cdp_member_segment_aggregates_ds
where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY)

NODE segments_with_changed_aggs_previous_day
SQL >
select id as segmentId
from segments
where
grandparentId in (
select grandparentId
from segments
where
id in (
select distinct segmentId
from cdp_member_segment_aggregates_ds
where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY)
)
)

NODE grandparent_segment_aggs_updated_previous_day
SQL >
SELECT
grandparentId as segmentId,
memberId,
'875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId,
countMerge(activityCountState) AS activityCount,
maxMerge(lastActiveState) AS lastActive,
groupArrayDistinctMerge(activityTypesState) AS activityTypes,
groupArrayDistinctMerge(activeOnState) AS activeOn,
avgMerge(averageSentimentState) AS averageSentiment,
now() AS updatedAt
FROM cdp_member_segment_aggregates_ds as cdp_aggs
join segments s on s.id = cdp_aggs.segmentId
where
cdp_aggs.segmentId in (select segmentId from segments_with_changed_aggs_previous_day)
and memberId in (select memberId from members_with_changed_aggs_previous_day)
GROUP BY grandparentId, memberId

TYPE SINK
EXPORT_SERVICE kafka
EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming
EXPORT_SCHEDULE 30 1 * * *
EXPORT_FORMAT csv
EXPORT_STRATEGY @new
EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
NODE leaf_segment_aggs_updated_previous_day
SQL >
%
SELECT
segmentId,
memberId,
'875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId,
countMerge(activityCountState) AS activityCount,
maxMerge(lastActiveState) AS lastActive,
groupArrayDistinctMerge(activityTypesState) AS activityTypes,
groupArrayDistinctMerge(activeOnState) AS activeOn,
avgMerge(averageSentimentState) AS averageSentiment,
now() as updatedAt
FROM cdp_member_segment_aggregates_ds
WHERE
(memberId, segmentId) in (
select distinct memberId, segmentId
from cdp_member_segment_aggregates_ds
where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY)
)
GROUP BY segmentId, memberId, updatedAt

TYPE SINK
EXPORT_SERVICE kafka
EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming
EXPORT_SCHEDULE 0 1 * * *
EXPORT_FORMAT csv
EXPORT_STRATEGY @new
EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
NODE members_with_changed_aggs_previous_day
SQL >
select distinct memberId
from cdp_member_segment_aggregates_ds
where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY)

NODE segments_with_changed_aggs_previous_day
SQL >
select id as segmentId
from segments
where
parentId in (
select parentId
from segments
where
id in (
select distinct segmentId
from cdp_member_segment_aggregates_ds
where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY)
)
)

NODE cdp_member_aggregates_sink_daily_parent_segments_1
SQL >
%
SELECT
parentId as segmentId,
memberId,
'875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId,
countMerge(activityCountState) AS activityCount,
maxMerge(lastActiveState) AS lastActive,
groupArrayDistinctMerge(activityTypesState) AS activityTypes,
groupArrayDistinctMerge(activeOnState) AS activeOn,
avgMerge(averageSentimentState) AS averageSentiment,
now() AS updatedAt
FROM cdp_member_segment_aggregates_ds as cdp_aggs
join segments s on s.id = cdp_aggs.segmentId
where
cdp_aggs.segmentId in (select segmentId from segments_with_changed_aggs_previous_day)
and memberId in (select memberId from members_with_changed_aggs_previous_day)
GROUP BY parentId, memberId

TYPE SINK
EXPORT_SERVICE kafka
EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming
EXPORT_SCHEDULE 0 1 * * *
EXPORT_FORMAT csv
EXPORT_STRATEGY @new
EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink
18 changes: 18 additions & 0 deletions services/libs/tinybird/pipes/cdp_member_segment_aggregates_MV.pipe
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
NODE cdp_member_aggregates_sink_initial_snapshot_0
SQL >
SELECT
segmentId,
memberId,
'875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId,
countState(activityId) AS activityCountState,
maxState(timestamp) AS lastActiveState,
groupArrayDistinctState(type) AS activityTypesState,
groupArrayDistinctState(platform) AS activeOnState,
avgState(sentimentScore) AS averageSentimentState,
maxState(updatedAt) AS lastActivityUpdatedAtState,
now64(3) as updatedAt
FROM activityRelations_enrich_snapshot_MV_ds
GROUP BY segmentId, memberId

TYPE MATERIALIZED
DATASOURCE cdp_member_segment_aggregates_ds
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
NODE cdp_member_aggregates_sink_initial_snapshot_0
SQL >
SELECT
segmentId,
memberId,
'875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId,
countState(activityId) AS activityCountState,
maxState(timestamp) AS lastActiveState,
groupArrayDistinctState(type) AS activityTypesState,
groupArrayDistinctState(platform) AS activeOnState,
avgState(sentimentScore) AS averageSentimentState,
maxState(act.updatedAt) as lastActivityUpdatedAtState,
max(act.updatedAt) as updatedAt
FROM activityRelations_enriched_deduplicated_ds act
GROUP BY segmentId, memberId

TYPE COPY
TARGET_DATASOURCE cdp_member_segment_aggregates_ds
COPY_MODE replace
COPY_SCHEDULE @on-demand
Loading
Loading