From 4357c22078350a2f0ce4b0fe2b22012c5748875e Mon Sep 17 00:00:00 2001 From: anilb Date: Fri, 19 Dec 2025 18:41:56 +0100 Subject: [PATCH 1/2] feat: aggs processing and sinks for cdp --- ...dp_member_segment_aggregates_ds.datasource | 16 +++ ...anization_segment_aggregates_ds.datasource | 17 +++ ...ber_aggregates_bucket_backfiller_sink.pipe | 78 ++++++++++++ ...tes_changed_grandparent_segments_sink.pipe | 57 +++++++++ ...aggregates_changed_leaf_segments_sink.pipe | 30 +++++ ...gregates_changed_parent_segments_sink.pipe | 58 +++++++++ .../cdp_member_segment_aggregates_MV.pipe | 23 ++++ ...r_segment_aggregates_initial_snapshot.pipe | 25 ++++ ...ion_aggregates_bucket_backfiller_sink.pipe | 117 ++++++++++++++++++ ...tes_changed_grandparent_segments_sink.pipe | 60 +++++++++ ...aggregates_changed_leaf_segments_sink.pipe | 32 +++++ ...gregates_changed_parent_segments_sink.pipe | 59 +++++++++ ...dp_organization_segment_aggregates_MV.pipe | 24 ++++ ...n_segment_aggregates_initial_snapshot.pipe | 26 ++++ 14 files changed, 622 insertions(+) create mode 100644 services/libs/tinybird/datasources/cdp_member_segment_aggregates_ds.datasource create mode 100644 services/libs/tinybird/datasources/cdp_organization_segment_aggregates_ds.datasource create mode 100644 services/libs/tinybird/pipes/cdp_member_aggregates_bucket_backfiller_sink.pipe create mode 100644 services/libs/tinybird/pipes/cdp_member_aggregates_changed_grandparent_segments_sink.pipe create mode 100644 services/libs/tinybird/pipes/cdp_member_aggregates_changed_leaf_segments_sink.pipe create mode 100644 services/libs/tinybird/pipes/cdp_member_aggregates_changed_parent_segments_sink.pipe create mode 100644 services/libs/tinybird/pipes/cdp_member_segment_aggregates_MV.pipe create mode 100644 services/libs/tinybird/pipes/cdp_member_segment_aggregates_initial_snapshot.pipe create mode 100644 services/libs/tinybird/pipes/cdp_organization_aggregates_bucket_backfiller_sink.pipe create mode 100644 services/libs/tinybird/pipes/cdp_organization_aggregates_changed_grandparent_segments_sink.pipe create mode 100644 services/libs/tinybird/pipes/cdp_organization_aggregates_changed_leaf_segments_sink.pipe create mode 100644 services/libs/tinybird/pipes/cdp_organization_aggregates_changed_parent_segments_sink.pipe create mode 100644 services/libs/tinybird/pipes/cdp_organization_segment_aggregates_MV.pipe create mode 100644 services/libs/tinybird/pipes/cdp_organization_segment_aggregates_initial_snapshot.pipe diff --git a/services/libs/tinybird/datasources/cdp_member_segment_aggregates_ds.datasource b/services/libs/tinybird/datasources/cdp_member_segment_aggregates_ds.datasource new file mode 100644 index 0000000000..bf4729d3f3 --- /dev/null +++ b/services/libs/tinybird/datasources/cdp_member_segment_aggregates_ds.datasource @@ -0,0 +1,16 @@ + +SCHEMA > + `segmentId` String, + `memberId` String, + `tenantId` String, + `activityCountState` AggregateFunction(count, String), + `lastActiveState` AggregateFunction(max, DateTime64(3)), + `activityTypesState` AggregateFunction(groupArrayDistinct, String), + `activeOnState` AggregateFunction(groupArrayDistinct, String), + `averageSentimentState` AggregateFunction(avg, Int8), + `lastActivityUpdatedAtState` AggregateFunction(max, DateTime64(3)), + `updatedAt` DateTime64(3) + +ENGINE "AggregatingMergeTree" +ENGINE_PARTITION_KEY "toYear(updatedAt)" +ENGINE_SORTING_KEY "segmentId, memberId" diff --git a/services/libs/tinybird/datasources/cdp_organization_segment_aggregates_ds.datasource b/services/libs/tinybird/datasources/cdp_organization_segment_aggregates_ds.datasource new file mode 100644 index 0000000000..9c2a5af85b --- /dev/null +++ b/services/libs/tinybird/datasources/cdp_organization_segment_aggregates_ds.datasource @@ -0,0 +1,17 @@ + +SCHEMA > + `segmentId` String, + `organizationId` String, + `tenantId` String, + `joinedAtState` AggregateFunction(min, DateTime64(3)), + `lastActiveState` AggregateFunction(max, DateTime64(3)), + `activeOnState` AggregateFunction(groupArrayDistinct, String), + `activityCountState` AggregateFunction(count, String), + `memberCountState` AggregateFunction(countDistinct, String), + `avgContributorEngagement` AggregateFunction(avg, Int8), + `lastActivityUpdatedAtState` AggregateFunction(max, DateTime64(3)), + `updatedAt` DateTime64(3) + +ENGINE "AggregatingMergeTree" +ENGINE_PARTITION_KEY "toYear(updatedAt)" +ENGINE_SORTING_KEY "segmentId, organizationId" diff --git a/services/libs/tinybird/pipes/cdp_member_aggregates_bucket_backfiller_sink.pipe b/services/libs/tinybird/pipes/cdp_member_aggregates_bucket_backfiller_sink.pipe new file mode 100644 index 0000000000..a49d64d437 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_member_aggregates_bucket_backfiller_sink.pipe @@ -0,0 +1,78 @@ +NODE leaf_segment_aggregates +SQL > + + % + SELECT + segmentId, + memberId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + countMerge(activityCountState) AS activityCount, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activityTypesState) AS activityTypes, + groupArrayDistinctMerge(activeOnState) AS activeOn, + avgMerge(averageSentimentState) AS averageSentiment, + maxMerge(updatedAtState) AS updatedAt + FROM cdp_member_segment_aggregates_ds + {% if defined(bucket_id) %} WHERE cityHash64(segmentId) % 10 = {{ UInt8(bucket_id, 0, description="This is bucket id of the activity segment", required=False)}} {% end %} + GROUP BY segmentId, memberId + + + +NODE parent_segment_aggregates +SQL > + + % + SELECT + parentId as segmentId, + memberId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + countMerge(activityCountState) AS activityCount, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activityTypesState) AS activityTypes, + groupArrayDistinctMerge(activeOnState) AS activeOn, + avgMerge(averageSentimentState) AS averageSentiment, + maxMerge(updatedAtState) AS updatedAt + FROM cdp_member_segment_aggregates_ds as cdp_aggs + join segments s on s.id = cdp_aggs.segmentId + {% if defined(bucket_id) %} WHERE cityHash64(segmentId) % 10 = {{ UInt8(bucket_id, 0, description="This is bucket id of the activity segment", required=False)}} {% end %} + GROUP BY parentId, memberId + + + +NODE grandparent_segment_aggregates +SQL > + + % + SELECT + grandparentId as segmentId, + memberId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + countMerge(activityCountState) AS activityCount, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activityTypesState) AS activityTypes, + groupArrayDistinctMerge(activeOnState) AS activeOn, + avgMerge(averageSentimentState) AS averageSentiment, + maxMerge(updatedAtState) AS updatedAt + FROM cdp_member_segment_aggregates_ds as cdp_aggs + join segments s on s.id = cdp_aggs.segmentId + {% if defined(bucket_id) %} WHERE cityHash64(grandparentId) % 10 = {{ UInt8(bucket_id, 0, description="This is bucket id of the activity segment", required=False)}} {% end %} + GROUP BY grandparentId, memberId + + + +NODE cdp_member_segment_aggs_union +SQL > + + select * from leaf_segment_aggregates + union all + select * from parent_segment_aggregates + union all + select * from grandparent_segment_aggregates + +TYPE sink +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_KAFKA_TOPIC memberSegmentsAggs_sink +EXPORT_SCHEDULE @on-demand + + diff --git a/services/libs/tinybird/pipes/cdp_member_aggregates_changed_grandparent_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_member_aggregates_changed_grandparent_segments_sink.pipe new file mode 100644 index 0000000000..4dfce665f0 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_member_aggregates_changed_grandparent_segments_sink.pipe @@ -0,0 +1,57 @@ +NODE members_with_changed_aggs_previous_day +SQL > + + select distinct memberId + from cdp_member_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + + + +NODE segments_with_changed_aggs_previous_day +SQL > + + select id as segmentId + from segments + where + grandparentId in ( + select grandparentId + from segments + where + id in ( + select distinct segmentId + from cdp_member_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + + ) + + ) + + + +NODE grandparent_segment_aggs_updated_previous_day +SQL > + + SELECT + grandparentId as segmentId, + memberId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + countMerge(activityCountState) AS activityCount, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activityTypesState) AS activityTypes, + groupArrayDistinctMerge(activeOnState) AS activeOn, + avgMerge(averageSentimentState) AS averageSentiment, + now() AS updatedAt + FROM cdp_member_segment_aggregates_ds as cdp_aggs + join segments s on s.id = cdp_aggs.segmentId + where + cdp_aggs.segmentId in (select segmentId from segments_with_changed_aggs_previous_day) + and memberId in (select memberId from members_with_changed_aggs_previous_day) + GROUP BY grandparentId, memberId + +TYPE sink +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink +EXPORT_SCHEDULE 30 1 * * * + + diff --git a/services/libs/tinybird/pipes/cdp_member_aggregates_changed_leaf_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_member_aggregates_changed_leaf_segments_sink.pipe new file mode 100644 index 0000000000..5a93af22c1 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_member_aggregates_changed_leaf_segments_sink.pipe @@ -0,0 +1,30 @@ +NODE leaf_segment_aggs_updated_previous_day +SQL > + + % + SELECT + segmentId, + memberId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + countMerge(activityCountState) AS activityCount, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activityTypesState) AS activityTypes, + groupArrayDistinctMerge(activeOnState) AS activeOn, + avgMerge(averageSentimentState) AS averageSentiment, + now() as updatedAt + FROM cdp_member_segment_aggregates_ds + WHERE + (memberId, segmentId) in ( + select distinct memberId, segmentId + from cdp_member_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + ) + GROUP BY segmentId, memberId, updatedAt + +TYPE sink +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink +EXPORT_SCHEDULE 0 1 * * * + + diff --git a/services/libs/tinybird/pipes/cdp_member_aggregates_changed_parent_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_member_aggregates_changed_parent_segments_sink.pipe new file mode 100644 index 0000000000..4497968df8 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_member_aggregates_changed_parent_segments_sink.pipe @@ -0,0 +1,58 @@ +NODE members_with_changed_aggs_previous_day +SQL > + + select distinct memberId + from cdp_member_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + + + +NODE segments_with_changed_aggs_previous_day +SQL > + + select id as segmentId + from segments + where + parentId in ( + select parentId + from segments + where + id in ( + select distinct segmentId + from cdp_member_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + + ) + + ) + + + +NODE cdp_member_aggregates_sink_daily_parent_segments_1 +SQL > + + % + SELECT + parentId as segmentId, + memberId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + countMerge(activityCountState) AS activityCount, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activityTypesState) AS activityTypes, + groupArrayDistinctMerge(activeOnState) AS activeOn, + avgMerge(averageSentimentState) AS averageSentiment, + now() AS updatedAt + FROM cdp_member_segment_aggregates_ds as cdp_aggs + join segments s on s.id = cdp_aggs.segmentId + where + cdp_aggs.segmentId in (select segmentId from segments_with_changed_aggs_previous_day) + and memberId in (select memberId from members_with_changed_aggs_previous_day) + GROUP BY parentId, memberId + +TYPE sink +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink +EXPORT_SCHEDULE 0 1 * * * + + diff --git a/services/libs/tinybird/pipes/cdp_member_segment_aggregates_MV.pipe b/services/libs/tinybird/pipes/cdp_member_segment_aggregates_MV.pipe new file mode 100644 index 0000000000..12cfe83cbd --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_member_segment_aggregates_MV.pipe @@ -0,0 +1,23 @@ +NODE cdp_member_aggregates_sink_initial_snapshot_0 +SQL > + + SELECT + segmentId, + memberId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + countState(activityId) AS activityCountState, + maxState(timestamp) AS lastActiveState, + groupArrayDistinctState(type) AS activityTypesState, + groupArrayDistinctState(platform) AS activeOnState, + avgState(sentimentScore) AS averageSentimentState, + maxState(updatedAt) AS lastActivityUpdatedAtState, + now64(3) as updatedAt + FROM activityRelations_enrich_snapshot_MV_ds + GROUP BY + segmentId, + memberId + +TYPE materialized +DATASOURCE cdp_member_segment_aggregates_ds + + diff --git a/services/libs/tinybird/pipes/cdp_member_segment_aggregates_initial_snapshot.pipe b/services/libs/tinybird/pipes/cdp_member_segment_aggregates_initial_snapshot.pipe new file mode 100644 index 0000000000..f3d5c4acf8 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_member_segment_aggregates_initial_snapshot.pipe @@ -0,0 +1,25 @@ +NODE cdp_member_aggregates_sink_initial_snapshot_0 +SQL > + + SELECT + segmentId, + memberId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + countState(activityId) AS activityCountState, + maxState(timestamp) AS lastActiveState, + groupArrayDistinctState(type) AS activityTypesState, + groupArrayDistinctState(platform) AS activeOnState, + avgState(sentimentScore) AS averageSentimentState, + maxState(act.updatedAt) as lastActivityUpdatedAtState, + max(act.updatedAt) as updatedAt + FROM activityRelations_enriched_deduplicated_ds act + GROUP BY + segmentId, + memberId + +TYPE copy +TARGET_DATASOURCE cdp_member_segment_aggregates_ds +COPY_MODE replace +COPY_SCHEDULE @on-demand + + diff --git a/services/libs/tinybird/pipes/cdp_organization_aggregates_bucket_backfiller_sink.pipe b/services/libs/tinybird/pipes/cdp_organization_aggregates_bucket_backfiller_sink.pipe new file mode 100644 index 0000000000..792438bed4 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_organization_aggregates_bucket_backfiller_sink.pipe @@ -0,0 +1,117 @@ +NODE leaf_segment_aggregates +SQL > + + % + SELECT + segmentId, + organizationId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + minMerge(joinedAtState) as joinedAt, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activeOnState) AS activeOn, + countMerge(activityCountState) AS activityCount, + countDistinctMerge(memberCountState) as memberCount, + round(avgMerge(avgContributorEngagement)) AS avgContributorEngagement, + max(updatedAt) AS updatedAt + FROM cdp_organization_segment_aggregates_ds + {% if defined(bucket_id) %} + WHERE + organizationId <> '' + AND cityHash64(segmentId) % 3 + = {{ + UInt8( + bucket_id, + 0, + description="This is bucket id of the activity segment", + required=False, + ) + }} + {% end %} + GROUP BY segmentId, organizationId + + + +NODE parent_segment_aggregates +SQL > + + % + SELECT + parentId as segmentId, + organizationId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + minMerge(joinedAtState) as joinedAt, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activeOnState) AS activeOn, + countMerge(activityCountState) AS activityCount, + countDistinctMerge(memberCountState) as memberCount, + round(avgMerge(avgContributorEngagement)) AS avgContributorEngagement, + max(updatedAt) AS updatedAt + FROM cdp_organization_segment_aggregates_ds as cdp_aggs + join segments s on s.id = cdp_aggs.segmentId + {% if defined(bucket_id) %} + WHERE + organizationId <> '' + AND cityHash64(segmentId) % 3 + = {{ + UInt8( + bucket_id, + 0, + description="This is bucket id of the activity segment", + required=False, + ) + }} + {% end %} + GROUP BY parentId, organizationId + + + +NODE grandparent_segment_aggregates +SQL > + + % + SELECT + grandparentId as segmentId, + organizationId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + minMerge(joinedAtState) as joinedAt, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activeOnState) AS activeOn, + countMerge(activityCountState) AS activityCount, + countDistinctMerge(memberCountState) as memberCount, + round(avgMerge(avgContributorEngagement)) AS avgContributorEngagement, + max(updatedAt) AS updatedAt + FROM cdp_organization_segment_aggregates_ds as cdp_aggs + join segments s on s.id = cdp_aggs.segmentId + {% if defined(bucket_id) %} + WHERE + organizationId <> '' + AND cityHash64(grandparentId) % 3 + = {{ + UInt8( + bucket_id, + 0, + description="This is bucket id of the activity segment", + required=False, + ) + }} + {% end %} + GROUP BY grandparentId, organizationId + + + +NODE cdp_organization_segment_aggs_union +SQL > + + select * from leaf_segment_aggregates + union all + select * from parent_segment_aggregates + union all + select * from grandparent_segment_aggregates + +TYPE sink +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink +EXPORT_SCHEDULE @on-demand + + diff --git a/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_grandparent_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_grandparent_segments_sink.pipe new file mode 100644 index 0000000000..a7c23753b5 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_grandparent_segments_sink.pipe @@ -0,0 +1,60 @@ +NODE organizations_with_changed_aggs_previous_day +SQL > + + select distinct organizationId + from cdp_organization_segment_aggregates_ds + where + organizationId <> '' + and updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + + + +NODE segments_with_changed_aggs_previous_day +SQL > + + select id as segmentId + from segments + where + grandparentId in ( + select grandparentId + from segments + where + id in ( + select distinct segmentId + from cdp_organization_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + + ) + + ) + + + +NODE grandparent_segment_aggs_updated_previous_day +SQL > + + SELECT + grandparentId as segmentId, + organizationId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + minMerge(joinedAtState) as joinedAt, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activeOnState) AS activeOn, + countMerge(activityCountState) AS activityCount, + countDistinctMerge(memberCountState) as memberCount, + avgMerge(avgContributorEngagement) AS avgContributorEngagement, + now() as updatedAt + FROM cdp_organization_segment_aggregates_ds as cdp_aggs + join segments s on s.id = cdp_aggs.segmentId + where + cdp_aggs.segmentId in (select segmentId from segments_with_changed_aggs_previous_day) + and organizationId in (select organizationId from organizations_with_changed_aggs_previous_day) + GROUP BY grandparentId, organizationId + +TYPE sink +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink +EXPORT_SCHEDULE 30 2 * * * + + diff --git a/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_leaf_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_leaf_segments_sink.pipe new file mode 100644 index 0000000000..dfd2f797a0 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_leaf_segments_sink.pipe @@ -0,0 +1,32 @@ +NODE leaf_segment_aggs_updated_previous_day +SQL > + + % + SELECT + segmentId, + organizationId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + minMerge(joinedAtState) as joinedAt, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activeOnState) AS activeOn, + countMerge(activityCountState) AS activityCount, + countDistinctMerge(memberCountState) as memberCount, + avgMerge(avgContributorEngagement) AS avgContributorEngagement, + now() as updatedAt + FROM cdp_organization_segment_aggregates_ds + WHERE + organizationId <> '' + AND (organizationId, segmentId) in ( + select distinct organizationId, segmentId + from cdp_organization_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + ) + GROUP BY segmentId, organizationId, updatedAt + +TYPE sink +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink +EXPORT_SCHEDULE 0 2 * * * + + diff --git a/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_parent_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_parent_segments_sink.pipe new file mode 100644 index 0000000000..133fb970e9 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_parent_segments_sink.pipe @@ -0,0 +1,59 @@ +NODE organizations_with_changed_aggs_previous_day +SQL > + + select distinct organizationId + from cdp_organization_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + + + +NODE segments_with_changed_aggs_previous_day +SQL > + + select id as segmentId + from segments + where + parentId in ( + select parentId + from segments + where + id in ( + select distinct segmentId + from cdp_organization_segment_aggregates_ds + where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) + + ) + + ) + + + +NODE cdp_member_aggregates_sink_daily_parent_segments_1 +SQL > + + % + SELECT + parentId as segmentId, + organizationId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + minMerge(joinedAtState) as joinedAt, + maxMerge(lastActiveState) AS lastActive, + groupArrayDistinctMerge(activeOnState) AS activeOn, + countMerge(activityCountState) AS activityCount, + countDistinctMerge(memberCountState) as memberCount, + avgMerge(avgContributorEngagement) AS avgContributorEngagement, + now() as updatedAt + FROM cdp_organization_segment_aggregates_ds as cdp_aggs + join segments s on s.id = cdp_aggs.segmentId + where + cdp_aggs.segmentId in (select segmentId from segments_with_changed_aggs_previous_day) + and organizationId in (select organizationId from organizations_with_changed_aggs_previous_day) + GROUP BY parentId, organizationId + +TYPE sink +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming +EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink +EXPORT_SCHEDULE 0 2 * * * + + diff --git a/services/libs/tinybird/pipes/cdp_organization_segment_aggregates_MV.pipe b/services/libs/tinybird/pipes/cdp_organization_segment_aggregates_MV.pipe new file mode 100644 index 0000000000..a5a2bada28 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_organization_segment_aggregates_MV.pipe @@ -0,0 +1,24 @@ +NODE cdp_member_aggregates_sink_initial_snapshot_0 +SQL > + + SELECT + segmentId, + organizationId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + minState(timestamp) as joinedAtState, + maxState(timestamp) AS lastActiveState, + groupArrayDistinctState(platform) AS activeOnState, + countState(activityId) AS activityCountState, + countDistinctState(memberId) as memberCountState, + avgState(score) AS avgContributorEngagement, + maxState(updatedAt) as lastActivityUpdatedAtState, + now64(3) as updatedAt + FROM activityRelations_enrich_snapshot_MV_ds + GROUP BY + segmentId, + organizationId + +TYPE materialized +DATASOURCE cdp_organization_segment_aggregates_ds + + diff --git a/services/libs/tinybird/pipes/cdp_organization_segment_aggregates_initial_snapshot.pipe b/services/libs/tinybird/pipes/cdp_organization_segment_aggregates_initial_snapshot.pipe new file mode 100644 index 0000000000..0b9a3a11b1 --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_organization_segment_aggregates_initial_snapshot.pipe @@ -0,0 +1,26 @@ +NODE cdp_org_aggregates_sink_initial_snapshot +SQL > + + SELECT + segmentId, + organizationId, + '875c38bd-2b1b-4e91-ad07-0cfbabb4c49f' AS tenantId, + minState(timestamp) as joinedAtState, + maxState(timestamp) AS lastActiveState, + groupArrayDistinctState(platform) AS activeOnState, + countState(activityId) AS activityCountState, + countDistinctState(memberId) as memberCountState, + avgState(score) AS avgContributorEngagement, + maxState(act.updatedAt) as lastActivityUpdatedAtState, + max(act.updatedAt) as updatedAt + FROM activityRelations_enriched_deduplicated_ds act + GROUP BY + segmentId, + organizationId + +TYPE copy +TARGET_DATASOURCE cdp_organization_segment_aggregates_ds +COPY_MODE replace +COPY_SCHEDULE @on-demand + + From f3e7aa15c944c36a370e6ca5a6ae72b183e4165c Mon Sep 17 00:00:00 2001 From: anilb Date: Fri, 19 Dec 2025 18:47:02 +0100 Subject: [PATCH 2/2] chore: formatting new resources --- ...dp_member_segment_aggregates_ds.datasource | 7 +- ...anization_segment_aggregates_ds.datasource | 7 +- ...ber_aggregates_bucket_backfiller_sink.pipe | 70 +++++++++++++------ ...tes_changed_grandparent_segments_sink.pipe | 17 ++--- ...aggregates_changed_leaf_segments_sink.pipe | 9 ++- ...gregates_changed_parent_segments_sink.pipe | 17 ++--- .../cdp_member_segment_aggregates_MV.pipe | 9 +-- ...r_segment_aggregates_initial_snapshot.pipe | 9 +-- ...ion_aggregates_bucket_backfiller_sink.pipe | 27 +++---- ...tes_changed_grandparent_segments_sink.pipe | 17 ++--- ...aggregates_changed_leaf_segments_sink.pipe | 9 ++- ...gregates_changed_parent_segments_sink.pipe | 17 ++--- ...dp_organization_segment_aggregates_MV.pipe | 9 +-- ...n_segment_aggregates_initial_snapshot.pipe | 9 +-- 14 files changed, 96 insertions(+), 137 deletions(-) diff --git a/services/libs/tinybird/datasources/cdp_member_segment_aggregates_ds.datasource b/services/libs/tinybird/datasources/cdp_member_segment_aggregates_ds.datasource index bf4729d3f3..3ee3f235e1 100644 --- a/services/libs/tinybird/datasources/cdp_member_segment_aggregates_ds.datasource +++ b/services/libs/tinybird/datasources/cdp_member_segment_aggregates_ds.datasource @@ -1,4 +1,3 @@ - SCHEMA > `segmentId` String, `memberId` String, @@ -11,6 +10,6 @@ SCHEMA > `lastActivityUpdatedAtState` AggregateFunction(max, DateTime64(3)), `updatedAt` DateTime64(3) -ENGINE "AggregatingMergeTree" -ENGINE_PARTITION_KEY "toYear(updatedAt)" -ENGINE_SORTING_KEY "segmentId, memberId" +ENGINE AggregatingMergeTree +ENGINE_PARTITION_KEY toYear(updatedAt) +ENGINE_SORTING_KEY segmentId, memberId diff --git a/services/libs/tinybird/datasources/cdp_organization_segment_aggregates_ds.datasource b/services/libs/tinybird/datasources/cdp_organization_segment_aggregates_ds.datasource index 9c2a5af85b..9ad5d7671e 100644 --- a/services/libs/tinybird/datasources/cdp_organization_segment_aggregates_ds.datasource +++ b/services/libs/tinybird/datasources/cdp_organization_segment_aggregates_ds.datasource @@ -1,4 +1,3 @@ - SCHEMA > `segmentId` String, `organizationId` String, @@ -12,6 +11,6 @@ SCHEMA > `lastActivityUpdatedAtState` AggregateFunction(max, DateTime64(3)), `updatedAt` DateTime64(3) -ENGINE "AggregatingMergeTree" -ENGINE_PARTITION_KEY "toYear(updatedAt)" -ENGINE_SORTING_KEY "segmentId, organizationId" +ENGINE AggregatingMergeTree +ENGINE_PARTITION_KEY toYear(updatedAt) +ENGINE_SORTING_KEY segmentId, organizationId diff --git a/services/libs/tinybird/pipes/cdp_member_aggregates_bucket_backfiller_sink.pipe b/services/libs/tinybird/pipes/cdp_member_aggregates_bucket_backfiller_sink.pipe index a49d64d437..bb177f85c9 100644 --- a/services/libs/tinybird/pipes/cdp_member_aggregates_bucket_backfiller_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_member_aggregates_bucket_backfiller_sink.pipe @@ -1,6 +1,5 @@ NODE leaf_segment_aggregates SQL > - % SELECT segmentId, @@ -11,16 +10,24 @@ SQL > groupArrayDistinctMerge(activityTypesState) AS activityTypes, groupArrayDistinctMerge(activeOnState) AS activeOn, avgMerge(averageSentimentState) AS averageSentiment, - maxMerge(updatedAtState) AS updatedAt + maxMerge(updatedAtState) AS updatedAt FROM cdp_member_segment_aggregates_ds - {% if defined(bucket_id) %} WHERE cityHash64(segmentId) % 10 = {{ UInt8(bucket_id, 0, description="This is bucket id of the activity segment", required=False)}} {% end %} + {% if defined(bucket_id) %} + WHERE + cityHash64(segmentId) % 10 + = {{ + UInt8( + bucket_id, + 0, + description="This is bucket id of the activity segment", + required=False, + ) + }} + {% end %} GROUP BY segmentId, memberId - - NODE parent_segment_aggregates SQL > - % SELECT parentId as segmentId, @@ -31,17 +38,25 @@ SQL > groupArrayDistinctMerge(activityTypesState) AS activityTypes, groupArrayDistinctMerge(activeOnState) AS activeOn, avgMerge(averageSentimentState) AS averageSentiment, - maxMerge(updatedAtState) AS updatedAt + maxMerge(updatedAtState) AS updatedAt FROM cdp_member_segment_aggregates_ds as cdp_aggs join segments s on s.id = cdp_aggs.segmentId - {% if defined(bucket_id) %} WHERE cityHash64(segmentId) % 10 = {{ UInt8(bucket_id, 0, description="This is bucket id of the activity segment", required=False)}} {% end %} + {% if defined(bucket_id) %} + WHERE + cityHash64(segmentId) % 10 + = {{ + UInt8( + bucket_id, + 0, + description="This is bucket id of the activity segment", + required=False, + ) + }} + {% end %} GROUP BY parentId, memberId - - NODE grandparent_segment_aggregates SQL > - % SELECT grandparentId as segmentId, @@ -55,24 +70,35 @@ SQL > maxMerge(updatedAtState) AS updatedAt FROM cdp_member_segment_aggregates_ds as cdp_aggs join segments s on s.id = cdp_aggs.segmentId - {% if defined(bucket_id) %} WHERE cityHash64(grandparentId) % 10 = {{ UInt8(bucket_id, 0, description="This is bucket id of the activity segment", required=False)}} {% end %} + {% if defined(bucket_id) %} + WHERE + cityHash64(grandparentId) % 10 + = {{ + UInt8( + bucket_id, + 0, + description="This is bucket id of the activity segment", + required=False, + ) + }} + {% end %} GROUP BY grandparentId, memberId - - NODE cdp_member_segment_aggs_union SQL > - - select * from leaf_segment_aggregates + select * + from leaf_segment_aggregates union all - select * from parent_segment_aggregates + select * + from parent_segment_aggregates union all - select * from grandparent_segment_aggregates + select * + from grandparent_segment_aggregates -TYPE sink +TYPE SINK EXPORT_SERVICE kafka EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming -EXPORT_KAFKA_TOPIC memberSegmentsAggs_sink EXPORT_SCHEDULE @on-demand - - +EXPORT_FORMAT csv +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC memberSegmentsAggs_sink diff --git a/services/libs/tinybird/pipes/cdp_member_aggregates_changed_grandparent_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_member_aggregates_changed_grandparent_segments_sink.pipe index 4dfce665f0..709bc09d5a 100644 --- a/services/libs/tinybird/pipes/cdp_member_aggregates_changed_grandparent_segments_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_member_aggregates_changed_grandparent_segments_sink.pipe @@ -1,15 +1,11 @@ NODE members_with_changed_aggs_previous_day SQL > - select distinct memberId from cdp_member_segment_aggregates_ds where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) - - NODE segments_with_changed_aggs_previous_day SQL > - select id as segmentId from segments where @@ -21,16 +17,11 @@ SQL > select distinct segmentId from cdp_member_segment_aggregates_ds where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) - ) - ) - - NODE grandparent_segment_aggs_updated_previous_day SQL > - SELECT grandparentId as segmentId, memberId, @@ -48,10 +39,10 @@ SQL > and memberId in (select memberId from members_with_changed_aggs_previous_day) GROUP BY grandparentId, memberId -TYPE sink +TYPE SINK EXPORT_SERVICE kafka EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming -EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink EXPORT_SCHEDULE 30 1 * * * - - +EXPORT_FORMAT csv +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink diff --git a/services/libs/tinybird/pipes/cdp_member_aggregates_changed_leaf_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_member_aggregates_changed_leaf_segments_sink.pipe index 5a93af22c1..5eaa3a7ae1 100644 --- a/services/libs/tinybird/pipes/cdp_member_aggregates_changed_leaf_segments_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_member_aggregates_changed_leaf_segments_sink.pipe @@ -1,6 +1,5 @@ NODE leaf_segment_aggs_updated_previous_day SQL > - % SELECT segmentId, @@ -21,10 +20,10 @@ SQL > ) GROUP BY segmentId, memberId, updatedAt -TYPE sink +TYPE SINK EXPORT_SERVICE kafka EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming -EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink EXPORT_SCHEDULE 0 1 * * * - - +EXPORT_FORMAT csv +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink diff --git a/services/libs/tinybird/pipes/cdp_member_aggregates_changed_parent_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_member_aggregates_changed_parent_segments_sink.pipe index 4497968df8..6e3ff065c2 100644 --- a/services/libs/tinybird/pipes/cdp_member_aggregates_changed_parent_segments_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_member_aggregates_changed_parent_segments_sink.pipe @@ -1,15 +1,11 @@ NODE members_with_changed_aggs_previous_day SQL > - select distinct memberId from cdp_member_segment_aggregates_ds where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) - - NODE segments_with_changed_aggs_previous_day SQL > - select id as segmentId from segments where @@ -21,16 +17,11 @@ SQL > select distinct segmentId from cdp_member_segment_aggregates_ds where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) - ) - ) - - NODE cdp_member_aggregates_sink_daily_parent_segments_1 SQL > - % SELECT parentId as segmentId, @@ -49,10 +40,10 @@ SQL > and memberId in (select memberId from members_with_changed_aggs_previous_day) GROUP BY parentId, memberId -TYPE sink +TYPE SINK EXPORT_SERVICE kafka EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming -EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink EXPORT_SCHEDULE 0 1 * * * - - +EXPORT_FORMAT csv +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC memberSegmentsAgg_sink diff --git a/services/libs/tinybird/pipes/cdp_member_segment_aggregates_MV.pipe b/services/libs/tinybird/pipes/cdp_member_segment_aggregates_MV.pipe index 12cfe83cbd..3a71264cb6 100644 --- a/services/libs/tinybird/pipes/cdp_member_segment_aggregates_MV.pipe +++ b/services/libs/tinybird/pipes/cdp_member_segment_aggregates_MV.pipe @@ -1,6 +1,5 @@ NODE cdp_member_aggregates_sink_initial_snapshot_0 SQL > - SELECT segmentId, memberId, @@ -13,11 +12,7 @@ SQL > maxState(updatedAt) AS lastActivityUpdatedAtState, now64(3) as updatedAt FROM activityRelations_enrich_snapshot_MV_ds - GROUP BY - segmentId, - memberId + GROUP BY segmentId, memberId -TYPE materialized +TYPE MATERIALIZED DATASOURCE cdp_member_segment_aggregates_ds - - diff --git a/services/libs/tinybird/pipes/cdp_member_segment_aggregates_initial_snapshot.pipe b/services/libs/tinybird/pipes/cdp_member_segment_aggregates_initial_snapshot.pipe index f3d5c4acf8..b6a69804fd 100644 --- a/services/libs/tinybird/pipes/cdp_member_segment_aggregates_initial_snapshot.pipe +++ b/services/libs/tinybird/pipes/cdp_member_segment_aggregates_initial_snapshot.pipe @@ -1,6 +1,5 @@ NODE cdp_member_aggregates_sink_initial_snapshot_0 SQL > - SELECT segmentId, memberId, @@ -13,13 +12,9 @@ SQL > maxState(act.updatedAt) as lastActivityUpdatedAtState, max(act.updatedAt) as updatedAt FROM activityRelations_enriched_deduplicated_ds act - GROUP BY - segmentId, - memberId + GROUP BY segmentId, memberId -TYPE copy +TYPE COPY TARGET_DATASOURCE cdp_member_segment_aggregates_ds COPY_MODE replace COPY_SCHEDULE @on-demand - - diff --git a/services/libs/tinybird/pipes/cdp_organization_aggregates_bucket_backfiller_sink.pipe b/services/libs/tinybird/pipes/cdp_organization_aggregates_bucket_backfiller_sink.pipe index 792438bed4..710e2c56b5 100644 --- a/services/libs/tinybird/pipes/cdp_organization_aggregates_bucket_backfiller_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_organization_aggregates_bucket_backfiller_sink.pipe @@ -1,6 +1,5 @@ NODE leaf_segment_aggregates SQL > - % SELECT segmentId, @@ -29,11 +28,8 @@ SQL > {% end %} GROUP BY segmentId, organizationId - - NODE parent_segment_aggregates SQL > - % SELECT parentId as segmentId, @@ -63,11 +59,8 @@ SQL > {% end %} GROUP BY parentId, organizationId - - NODE grandparent_segment_aggregates SQL > - % SELECT grandparentId as segmentId, @@ -97,21 +90,21 @@ SQL > {% end %} GROUP BY grandparentId, organizationId - - NODE cdp_organization_segment_aggs_union SQL > - - select * from leaf_segment_aggregates + select * + from leaf_segment_aggregates union all - select * from parent_segment_aggregates + select * + from parent_segment_aggregates union all - select * from grandparent_segment_aggregates + select * + from grandparent_segment_aggregates -TYPE sink +TYPE SINK EXPORT_SERVICE kafka EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming -EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink EXPORT_SCHEDULE @on-demand - - +EXPORT_FORMAT csv +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink diff --git a/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_grandparent_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_grandparent_segments_sink.pipe index a7c23753b5..e8f8b77256 100644 --- a/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_grandparent_segments_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_grandparent_segments_sink.pipe @@ -1,17 +1,13 @@ NODE organizations_with_changed_aggs_previous_day SQL > - select distinct organizationId from cdp_organization_segment_aggregates_ds where organizationId <> '' and updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) - - NODE segments_with_changed_aggs_previous_day SQL > - select id as segmentId from segments where @@ -23,16 +19,11 @@ SQL > select distinct segmentId from cdp_organization_segment_aggregates_ds where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) - ) - ) - - NODE grandparent_segment_aggs_updated_previous_day SQL > - SELECT grandparentId as segmentId, organizationId, @@ -51,10 +42,10 @@ SQL > and organizationId in (select organizationId from organizations_with_changed_aggs_previous_day) GROUP BY grandparentId, organizationId -TYPE sink +TYPE SINK EXPORT_SERVICE kafka EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming -EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink EXPORT_SCHEDULE 30 2 * * * - - +EXPORT_FORMAT csv +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink diff --git a/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_leaf_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_leaf_segments_sink.pipe index dfd2f797a0..e39d226927 100644 --- a/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_leaf_segments_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_leaf_segments_sink.pipe @@ -1,6 +1,5 @@ NODE leaf_segment_aggs_updated_previous_day SQL > - % SELECT segmentId, @@ -23,10 +22,10 @@ SQL > ) GROUP BY segmentId, organizationId, updatedAt -TYPE sink +TYPE SINK EXPORT_SERVICE kafka EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming -EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink EXPORT_SCHEDULE 0 2 * * * - - +EXPORT_FORMAT csv +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink diff --git a/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_parent_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_parent_segments_sink.pipe index 133fb970e9..bd8cb4dc36 100644 --- a/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_parent_segments_sink.pipe +++ b/services/libs/tinybird/pipes/cdp_organization_aggregates_changed_parent_segments_sink.pipe @@ -1,15 +1,11 @@ NODE organizations_with_changed_aggs_previous_day SQL > - select distinct organizationId from cdp_organization_segment_aggregates_ds where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) - - NODE segments_with_changed_aggs_previous_day SQL > - select id as segmentId from segments where @@ -21,16 +17,11 @@ SQL > select distinct segmentId from cdp_organization_segment_aggregates_ds where updatedAt >= toStartOfDay(toTimeZone(now(), 'Europe/Berlin') - INTERVAL 1 DAY) - ) - ) - - NODE cdp_member_aggregates_sink_daily_parent_segments_1 SQL > - % SELECT parentId as segmentId, @@ -50,10 +41,10 @@ SQL > and organizationId in (select organizationId from organizations_with_changed_aggs_previous_day) GROUP BY parentId, organizationId -TYPE sink +TYPE SINK EXPORT_SERVICE kafka EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming -EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink EXPORT_SCHEDULE 0 2 * * * - - +EXPORT_FORMAT csv +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC organizationSegmentsAgg_sink diff --git a/services/libs/tinybird/pipes/cdp_organization_segment_aggregates_MV.pipe b/services/libs/tinybird/pipes/cdp_organization_segment_aggregates_MV.pipe index a5a2bada28..93a5a3e4b5 100644 --- a/services/libs/tinybird/pipes/cdp_organization_segment_aggregates_MV.pipe +++ b/services/libs/tinybird/pipes/cdp_organization_segment_aggregates_MV.pipe @@ -1,6 +1,5 @@ NODE cdp_member_aggregates_sink_initial_snapshot_0 SQL > - SELECT segmentId, organizationId, @@ -14,11 +13,7 @@ SQL > maxState(updatedAt) as lastActivityUpdatedAtState, now64(3) as updatedAt FROM activityRelations_enrich_snapshot_MV_ds - GROUP BY - segmentId, - organizationId + GROUP BY segmentId, organizationId -TYPE materialized +TYPE MATERIALIZED DATASOURCE cdp_organization_segment_aggregates_ds - - diff --git a/services/libs/tinybird/pipes/cdp_organization_segment_aggregates_initial_snapshot.pipe b/services/libs/tinybird/pipes/cdp_organization_segment_aggregates_initial_snapshot.pipe index 0b9a3a11b1..e1e84adaed 100644 --- a/services/libs/tinybird/pipes/cdp_organization_segment_aggregates_initial_snapshot.pipe +++ b/services/libs/tinybird/pipes/cdp_organization_segment_aggregates_initial_snapshot.pipe @@ -1,6 +1,5 @@ NODE cdp_org_aggregates_sink_initial_snapshot SQL > - SELECT segmentId, organizationId, @@ -14,13 +13,9 @@ SQL > maxState(act.updatedAt) as lastActivityUpdatedAtState, max(act.updatedAt) as updatedAt FROM activityRelations_enriched_deduplicated_ds act - GROUP BY - segmentId, - organizationId + GROUP BY segmentId, organizationId -TYPE copy +TYPE COPY TARGET_DATASOURCE cdp_organization_segment_aggregates_ds COPY_MODE replace COPY_SCHEDULE @on-demand - -