-
Notifications
You must be signed in to change notification settings - Fork 726
feat: pipes and datasources for CDP aggs (CDP-804) #3714
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,15 @@ | ||
| SCHEMA > | ||
| `segmentId` String, | ||
| `memberId` String, | ||
| `tenantId` String, | ||
| `activityCountState` AggregateFunction(count, String), | ||
| `lastActiveState` AggregateFunction(max, DateTime64(3)), | ||
| `activityTypesState` AggregateFunction(groupArrayDistinct, String), | ||
| `activeOnState` AggregateFunction(groupArrayDistinct, String), | ||
| `averageSentimentState` AggregateFunction(avg, Int8), | ||
| `lastActivityUpdatedAtState` AggregateFunction(max, DateTime64(3)), | ||
| `updatedAt` DateTime64(3) | ||
|
|
||
| ENGINE AggregatingMergeTree | ||
| ENGINE_PARTITION_KEY toYear(updatedAt) | ||
| ENGINE_SORTING_KEY segmentId, memberId |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,16 @@ | ||
| SCHEMA > | ||
| `segmentId` String, | ||
| `organizationId` String, | ||
| `tenantId` String, | ||
| `joinedAtState` AggregateFunction(min, DateTime64(3)), | ||
| `lastActiveState` AggregateFunction(max, DateTime64(3)), | ||
| `activeOnState` AggregateFunction(groupArrayDistinct, String), | ||
| `activityCountState` AggregateFunction(count, String), | ||
| `memberCountState` AggregateFunction(countDistinct, String), | ||
| `avgContributorEngagement` AggregateFunction(avg, Int8), | ||
| `lastActivityUpdatedAtState` AggregateFunction(max, DateTime64(3)), | ||
| `updatedAt` DateTime64(3) | ||
|
|
||
| ENGINE AggregatingMergeTree | ||
| ENGINE_PARTITION_KEY toYear(updatedAt) | ||
| ENGINE_SORTING_KEY segmentId, organizationId |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,104 @@ | ||
| NODE leaf_segment_aggregates | ||
| SQL > | ||
| % | ||
| SELECT | ||
| segmentId, | ||
| memberId, | ||
| '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, | ||
| countMerge(activityCountState) AS activityCount, | ||
| maxMerge(lastActiveState) AS lastActive, | ||
| groupArrayDistinctMerge(activityTypesState) AS activityTypes, | ||
| groupArrayDistinctMerge(activeOnState) AS activeOn, | ||
| avgMerge(averageSentimentState) AS averageSentiment, | ||
| maxMerge(updatedAtState) AS updatedAt | ||
| FROM cdp_member_segment_aggregates_ds | ||
| {% if defined(bucket_id) %} | ||
| WHERE | ||
| cityHash64(segmentId) % 10 | ||
| = {{ | ||
| UInt8( | ||
| bucket_id, | ||
| 0, | ||
| description="This is bucket id of the activity segment", | ||
| required=False, | ||
| ) | ||
| }} | ||
| {% end %} | ||
| GROUP BY segmentId, memberId | ||
|
|
||
| NODE parent_segment_aggregates | ||
| SQL > | ||
| % | ||
| SELECT | ||
| parentId as segmentId, | ||
| memberId, | ||
| '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, | ||
| countMerge(activityCountState) AS activityCount, | ||
| maxMerge(lastActiveState) AS lastActive, | ||
| groupArrayDistinctMerge(activityTypesState) AS activityTypes, | ||
| groupArrayDistinctMerge(activeOnState) AS activeOn, | ||
| avgMerge(averageSentimentState) AS averageSentiment, | ||
| maxMerge(updatedAtState) AS updatedAt | ||
| FROM cdp_member_segment_aggregates_ds as cdp_aggs | ||
| join segments s on s.id = cdp_aggs.segmentId | ||
| {% if defined(bucket_id) %} | ||
| WHERE | ||
| cityHash64(segmentId) % 10 | ||
| = {{ | ||
| UInt8( | ||
| bucket_id, | ||
| 0, | ||
| description="This is bucket id of the activity segment", | ||
| required=False, | ||
| ) | ||
| }} | ||
| {% end %} | ||
| GROUP BY parentId, memberId | ||
|
|
||
| NODE grandparent_segment_aggregates | ||
| SQL > | ||
| % | ||
| SELECT | ||
| grandparentId as segmentId, | ||
| memberId, | ||
| '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, | ||
| countMerge(activityCountState) AS activityCount, | ||
| maxMerge(lastActiveState) AS lastActive, | ||
| groupArrayDistinctMerge(activityTypesState) AS activityTypes, | ||
| groupArrayDistinctMerge(activeOnState) AS activeOn, | ||
| avgMerge(averageSentimentState) AS averageSentiment, | ||
| maxMerge(updatedAtState) AS updatedAt | ||
| FROM cdp_member_segment_aggregates_ds as cdp_aggs | ||
| join segments s on s.id = cdp_aggs.segmentId | ||
| {% if defined(bucket_id) %} | ||
| WHERE | ||
| cityHash64(grandparentId) % 10 | ||
| = {{ | ||
| UInt8( | ||
| bucket_id, | ||
| 0, | ||
| description="This is bucket id of the activity segment", | ||
| required=False, | ||
| ) | ||
| }} | ||
| {% end %} | ||
| GROUP BY grandparentId, memberId | ||
|
|
||
| NODE cdp_member_segment_aggs_union | ||
| SQL > | ||
| select * | ||
| from leaf_segment_aggregates | ||
| union all | ||
| select * | ||
| from parent_segment_aggregates | ||
| union all | ||
| select * | ||
| from grandparent_segment_aggregates | ||
|
|
||
| TYPE SINK | ||
| EXPORT_SERVICE kafka | ||
| EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming | ||
| EXPORT_SCHEDULE @on-demand | ||
| EXPORT_FORMAT csv | ||
| EXPORT_STRATEGY @new | ||
| EXPORT_KAFKA_TOPIC memberSegmentsAggs_sink | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Kafka topic name typo sends data to wrong topicThe backfiller sink exports to |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,48 @@ | ||
| NODE members_with_changed_aggs_previous_day | ||
| SQL > | ||
| select distinct memberId | ||
| from cdp_member_segment_aggregates_ds | ||
| where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) | ||
|
|
||
| NODE segments_with_changed_aggs_previous_day | ||
| SQL > | ||
| select id as segmentId | ||
| from segments | ||
| where | ||
| grandparentId in ( | ||
| select grandparentId | ||
| from segments | ||
| where | ||
| id in ( | ||
| select distinct segmentId | ||
| from cdp_member_segment_aggregates_ds | ||
| where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) | ||
| ) | ||
| ) | ||
|
|
||
| NODE grandparent_segment_aggs_updated_previous_day | ||
| SQL > | ||
| SELECT | ||
| grandparentId as segmentId, | ||
| memberId, | ||
| '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, | ||
| countMerge(activityCountState) AS activityCount, | ||
| maxMerge(lastActiveState) AS lastActive, | ||
| groupArrayDistinctMerge(activityTypesState) AS activityTypes, | ||
| groupArrayDistinctMerge(activeOnState) AS activeOn, | ||
| avgMerge(averageSentimentState) AS averageSentiment, | ||
| now() AS updatedAt | ||
| FROM cdp_member_segment_aggregates_ds as cdp_aggs | ||
| join segments s on s.id = cdp_aggs.segmentId | ||
| where | ||
| cdp_aggs.segmentId in (select segmentId from segments_with_changed_aggs_previous_day) | ||
| and memberId in (select memberId from members_with_changed_aggs_previous_day) | ||
| GROUP BY grandparentId, memberId | ||
|
|
||
| TYPE SINK | ||
| EXPORT_SERVICE kafka | ||
| EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming | ||
| EXPORT_SCHEDULE 30 1 * * * | ||
| EXPORT_FORMAT csv | ||
| EXPORT_STRATEGY @new | ||
| EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| NODE leaf_segment_aggs_updated_previous_day | ||
| SQL > | ||
| % | ||
| SELECT | ||
| segmentId, | ||
| memberId, | ||
| '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, | ||
| countMerge(activityCountState) AS activityCount, | ||
| maxMerge(lastActiveState) AS lastActive, | ||
| groupArrayDistinctMerge(activityTypesState) AS activityTypes, | ||
| groupArrayDistinctMerge(activeOnState) AS activeOn, | ||
| avgMerge(averageSentimentState) AS averageSentiment, | ||
| now() as updatedAt | ||
| FROM cdp_member_segment_aggregates_ds | ||
| WHERE | ||
| (memberId, segmentId) in ( | ||
| select distinct memberId, segmentId | ||
| from cdp_member_segment_aggregates_ds | ||
| where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) | ||
| ) | ||
| GROUP BY segmentId, memberId, updatedAt | ||
|
|
||
| TYPE SINK | ||
| EXPORT_SERVICE kafka | ||
| EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming | ||
| EXPORT_SCHEDULE 0 1 * * * | ||
| EXPORT_FORMAT csv | ||
| EXPORT_STRATEGY @new | ||
| EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| NODE members_with_changed_aggs_previous_day | ||
| SQL > | ||
| select distinct memberId | ||
| from cdp_member_segment_aggregates_ds | ||
| where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) | ||
|
|
||
| NODE segments_with_changed_aggs_previous_day | ||
| SQL > | ||
| select id as segmentId | ||
| from segments | ||
| where | ||
| parentId in ( | ||
| select parentId | ||
| from segments | ||
| where | ||
| id in ( | ||
| select distinct segmentId | ||
| from cdp_member_segment_aggregates_ds | ||
| where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) | ||
| ) | ||
| ) | ||
|
|
||
| NODE cdp_member_aggregates_sink_daily_parent_segments_1 | ||
| SQL > | ||
| % | ||
| SELECT | ||
| parentId as segmentId, | ||
| memberId, | ||
| '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, | ||
| countMerge(activityCountState) AS activityCount, | ||
| maxMerge(lastActiveState) AS lastActive, | ||
| groupArrayDistinctMerge(activityTypesState) AS activityTypes, | ||
| groupArrayDistinctMerge(activeOnState) AS activeOn, | ||
| avgMerge(averageSentimentState) AS averageSentiment, | ||
| now() AS updatedAt | ||
| FROM cdp_member_segment_aggregates_ds as cdp_aggs | ||
| join segments s on s.id = cdp_aggs.segmentId | ||
| where | ||
| cdp_aggs.segmentId in (select segmentId from segments_with_changed_aggs_previous_day) | ||
| and memberId in (select memberId from members_with_changed_aggs_previous_day) | ||
| GROUP BY parentId, memberId | ||
|
|
||
| TYPE SINK | ||
| EXPORT_SERVICE kafka | ||
| EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming | ||
| EXPORT_SCHEDULE 0 1 * * * | ||
| EXPORT_FORMAT csv | ||
| EXPORT_STRATEGY @new | ||
| EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| NODE cdp_member_aggregates_sink_initial_snapshot_0 | ||
| SQL > | ||
| SELECT | ||
| segmentId, | ||
| memberId, | ||
| '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, | ||
| countState(activityId) AS activityCountState, | ||
| maxState(timestamp) AS lastActiveState, | ||
| groupArrayDistinctState(type) AS activityTypesState, | ||
| groupArrayDistinctState(platform) AS activeOnState, | ||
| avgState(sentimentScore) AS averageSentimentState, | ||
| maxState(updatedAt) AS lastActivityUpdatedAtState, | ||
| now64(3) as updatedAt | ||
| FROM activityRelations_enrich_snapshot_MV_ds | ||
| GROUP BY segmentId, memberId | ||
|
|
||
| TYPE MATERIALIZED | ||
| DATASOURCE cdp_member_segment_aggregates_ds |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| NODE cdp_member_aggregates_sink_initial_snapshot_0 | ||
| SQL > | ||
| SELECT | ||
| segmentId, | ||
| memberId, | ||
| '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, | ||
| countState(activityId) AS activityCountState, | ||
| maxState(timestamp) AS lastActiveState, | ||
| groupArrayDistinctState(type) AS activityTypesState, | ||
| groupArrayDistinctState(platform) AS activeOnState, | ||
| avgState(sentimentScore) AS averageSentimentState, | ||
| maxState(act.updatedAt) as lastActivityUpdatedAtState, | ||
| max(act.updatedAt) as updatedAt | ||
| FROM activityRelations_enriched_deduplicated_ds act | ||
| GROUP BY segmentId, memberId | ||
|
|
||
| TYPE COPY | ||
| TARGET_DATASOURCE cdp_member_segment_aggregates_ds | ||
| COPY_MODE replace | ||
| COPY_SCHEDULE @on-demand |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: References non-existent column
updatedAtStatein queriesThe query references
maxMerge(updatedAtState)but this column doesn't exist incdp_member_segment_aggregates_ds. The datasource schema defineslastActivityUpdatedAtState(an aggregate function) andupdatedAt(a regular timestamp), notupdatedAtState. This will cause the query to fail at runtime. The same issue occurs in all three nodes:leaf_segment_aggregates,parent_segment_aggregates, andgrandparent_segment_aggregates.Additional Locations (2)
services/libs/tinybird/pipes/cdp_member_aggregates_bucket_backfiller_sink.pipe#L40-L41services/libs/tinybird/pipes/cdp_member_aggregates_bucket_backfiller_sink.pipe#L69-L70