From 53870cecf33d5f472d6d590d29f71932c4cf63d9 Mon Sep 17 00:00:00 2001 From: arbiv Date: Mon, 20 Oct 2025 17:48:13 +0300 Subject: [PATCH 01/18] Add exclude_detection_period_from_training flag to volume_anomalies test - Add optional exclude_detection_period_from_training parameter to test_volume_anomalies.sql - Pass parameter through test_table_anomalies.sql to get_anomalies_test_configuration.sql - Add parameter handling and configuration in get_anomalies_test_configuration.sql - Implement core exclusion logic in get_anomaly_scores_query.sql: - Calculate detection_period_start boundary based on backfill_days - Add is_detection_period flag to grouped_metrics CTE - Exclude detection period metrics from time_window_aggregation training - Flag defaults to false to preserve backward compatibility - Enables clean separation between training and detection data for improved anomaly detection accuracy --- .../tests/test_volume_anomalies.py | 86 +++++++++++++++++++ .../get_anomaly_scores_query.sql | 13 +++ .../get_anomalies_test_configuration.sql | 7 +- macros/edr/tests/test_table_anomalies.sql | 5 +- macros/edr/tests/test_volume_anomalies.sql | 5 +- 5 files changed, 110 insertions(+), 6 deletions(-) diff --git a/integration_tests/tests/test_volume_anomalies.py b/integration_tests/tests/test_volume_anomalies.py index 7f01091fe..4a322d731 100644 --- a/integration_tests/tests/test_volume_anomalies.py +++ b/integration_tests/tests/test_volume_anomalies.py @@ -534,3 +534,89 @@ def test_anomalyless_vol_anomalies_with_test_materialization( test_vars={"enable_elementary_test_materialization": True}, ) assert test_result["status"] == "pass" + + +# Test for exclude_detection_period_from_training functionality +# This test demonstrates the use case where: +# 1. Detection period contains anomalous data that would normally be included in training +# 2. With exclude_detection_period_from_training=False: anomaly is missed (test passes) because training includes the anomaly +# 3. With exclude_detection_period_from_training=True: anomaly is detected (test fails) because training excludes the anomaly +@pytest.mark.skip_targets(["clickhouse"]) +def test_exclude_detection_period_from_training_use_case( + test_id: str, dbt_project: DbtProject +): + """ + Test the exclude_detection_period_from_training flag functionality. + + Scenario: + - 30 days of normal data (100 rows per day) + - 7 days of anomalous data (500 rows per day) in detection period + - Without exclusion: anomaly gets included in training baseline, test passes (misses anomaly) + - With exclusion: anomaly excluded from training, test fails (detects anomaly) + """ + utc_now = datetime.utcnow() + + # Generate 30 days of normal data (100 rows per day) + normal_data = [] + for i in range(30): + date = utc_now - timedelta(days=30 - i) + normal_data.extend( + [ + {TIMESTAMP_COLUMN: date.strftime(DATE_FORMAT)} + for _ in range(100) # 100 rows per day + ] + ) + + # Generate 7 days of anomalous data (500 rows per day) - this will be in detection period + anomalous_data = [] + for i in range(7): + date = utc_now - timedelta(days=7 - i) + anomalous_data.extend( + [ + {TIMESTAMP_COLUMN: date.strftime(DATE_FORMAT)} + for _ in range(500) # 500 rows per day - 5x normal volume + ] + ) + + # Combine all data + all_data = normal_data + anomalous_data + + # Test 1: WITHOUT exclusion (should pass - misses the anomaly because it's included in training) + test_args_without_exclusion = { + **DBT_TEST_ARGS, + "training_period": {"period": "day", "count": 30}, + "detection_period": {"period": "day", "count": 7}, + "time_bucket": {"period": "day", "count": 1}, + "anomaly_sensitivity": 3, # High sensitivity to catch the 5x spike + # exclude_detection_period_from_training is not set (defaults to False/None) + } + + test_result_without_exclusion = dbt_project.test( + test_id + "_without_exclusion", + DBT_TEST_NAME, + test_args_without_exclusion, + data=all_data, + ) + + # This should PASS because the anomaly is included in training, making it part of the baseline + assert ( + test_result_without_exclusion["status"] == "pass" + ), "Test should pass when anomaly is included in training" + + # Test 2: WITH exclusion (should fail - detects the anomaly because it's excluded from training) + test_args_with_exclusion = { + **test_args_without_exclusion, + "exclude_detection_period_from_training": True, # NEW FLAG + } + + test_result_with_exclusion = dbt_project.test( + test_id + "_with_exclusion", + DBT_TEST_NAME, + test_args_with_exclusion, + data=all_data, + ) + + # This should FAIL because the anomaly is excluded from training, so it's detected as anomalous + assert ( + test_result_with_exclusion["status"] == "fail" + ), "Test should fail when anomaly is excluded from training" diff --git a/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql b/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql index 4819ae772..3cc126485 100644 --- a/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql +++ b/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql @@ -31,6 +31,12 @@ {%- set detection_end_expr = elementary.edr_cast_as_timestamp(elementary.edr_datetime_to_sql(detection_end)) %} {%- set min_bucket_start_expr = elementary.get_trunc_min_bucket_start_expr(detection_end, metric_properties, test_configuration.days_back) %} + {# NEW: Calculate detection period start for exclusion logic #} + {%- if test_configuration.exclude_detection_period_from_training %} + {%- set detection_period_start = (detection_end - modules.datetime.timedelta(days=test_configuration.backfill_days)) %} + {%- set detection_period_start_expr = elementary.edr_cast_as_timestamp(elementary.edr_datetime_to_sql(detection_period_start)) %} + {%- endif %} + {# For timestamped tests, this will be the bucket start, and for non-timestamped tests it will be the bucket end (which is the actual time of the test) #} {%- set metric_time_bucket_expr = 'case when bucket_start is not null then bucket_start else bucket_end end' %} @@ -142,6 +148,12 @@ bucket_end, {{ bucket_seasonality_expr }} as bucket_seasonality, {{ test_configuration.anomaly_exclude_metrics or 'FALSE' }} as is_excluded, + {# NEW: Flag detection period metrics for exclusion from training #} + {% if test_configuration.exclude_detection_period_from_training %} + bucket_end > {{ detection_period_start_expr }} + {% else %} + FALSE + {% endif %} as is_detection_period, bucket_duration_hours, updated_at from grouped_metrics_duplicates @@ -171,6 +183,7 @@ first_value(bucket_end) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_start from grouped_metrics where not is_excluded + and not is_detection_period {# NEW: Exclude detection period from training #} {{ dbt_utils.group_by(13) }} ), diff --git a/macros/edr/tests/test_configuration/get_anomalies_test_configuration.sql b/macros/edr/tests/test_configuration/get_anomalies_test_configuration.sql index fa9d061e6..41d338f62 100644 --- a/macros/edr/tests/test_configuration/get_anomalies_test_configuration.sql +++ b/macros/edr/tests/test_configuration/get_anomalies_test_configuration.sql @@ -23,7 +23,8 @@ anomaly_exclude_metrics, detection_period, training_period, - exclude_final_results) %} + exclude_final_results, + exclude_detection_period_from_training) %} {%- set model_graph_node = elementary.get_model_graph_node(model_relation) %} {# Changes in these configs impact the metric id of the test. #} @@ -53,6 +54,7 @@ {% set anomaly_exclude_metrics = elementary.get_test_argument('anomaly_exclude_metrics', anomaly_exclude_metrics, model_graph_node) %} {% set exclude_final_results = elementary.get_exclude_final_results(exclude_final_results) %} + {% set exclude_detection_period_from_training = elementary.get_test_argument('exclude_detection_period_from_training', exclude_detection_period_from_training, model_graph_node) %} {% set test_configuration = {'timestamp_column': metric_props.timestamp_column, @@ -71,7 +73,8 @@ 'fail_on_zero': fail_on_zero, 'detection_delay': detection_delay, 'anomaly_exclude_metrics': anomaly_exclude_metrics, - 'exclude_final_results': exclude_final_results + 'exclude_final_results': exclude_final_results, + 'exclude_detection_period_from_training': exclude_detection_period_from_training } %} {%- set test_configuration = elementary.undefined_dict_keys_to_none(test_configuration) -%} {%- do elementary.validate_mandatory_configuration(test_configuration, mandatory_params) -%} diff --git a/macros/edr/tests/test_table_anomalies.sql b/macros/edr/tests/test_table_anomalies.sql index 662bbc269..db7e41fe5 100644 --- a/macros/edr/tests/test_table_anomalies.sql +++ b/macros/edr/tests/test_table_anomalies.sql @@ -1,4 +1,4 @@ -{% test table_anomalies(model, table_anomalies, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, mandatory_params=none, event_timestamp_column=none, freshness_column=none, sensitivity=none, ignore_small_changes={"spike_failure_percent_threshold": none, "drop_failure_percent_threshold": none}, fail_on_zero=false, detection_delay=none, anomaly_exclude_metrics=none, detection_period=none, training_period=none) %} +{% test table_anomalies(model, table_anomalies, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, mandatory_params=none, event_timestamp_column=none, freshness_column=none, sensitivity=none, ignore_small_changes={"spike_failure_percent_threshold": none, "drop_failure_percent_threshold": none}, fail_on_zero=false, detection_delay=none, anomaly_exclude_metrics=none, detection_period=none, training_period=none, exclude_detection_period_from_training=none) %} {{ config(tags = ['elementary-tests']) }} {%- if execute and elementary.is_test_command() and elementary.is_elementary_enabled() %} {% set model_relation = elementary.get_model_relation_for_test(model, elementary.get_test_model()) %} @@ -37,7 +37,8 @@ detection_delay=detection_delay, anomaly_exclude_metrics=anomaly_exclude_metrics, detection_period=detection_period, - training_period=training_period) %} + training_period=training_period, + exclude_detection_period_from_training=exclude_detection_period_from_training) %} {% if not test_configuration %} {{ exceptions.raise_compiler_error("Failed to create test configuration dict for test `{}`".format(test_table_name)) }} diff --git a/macros/edr/tests/test_volume_anomalies.sql b/macros/edr/tests/test_volume_anomalies.sql index 1c672522f..b96bda4d4 100644 --- a/macros/edr/tests/test_volume_anomalies.sql +++ b/macros/edr/tests/test_volume_anomalies.sql @@ -1,4 +1,4 @@ -{% test volume_anomalies(model, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, ignore_small_changes, fail_on_zero, detection_delay, anomaly_exclude_metrics, detection_period, training_period) %} +{% test volume_anomalies(model, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, ignore_small_changes, fail_on_zero, detection_delay, anomaly_exclude_metrics, detection_period, training_period, exclude_detection_period_from_training) %} {{ config(tags = ['elementary-tests']) }} {{ elementary.test_table_anomalies( @@ -20,7 +20,8 @@ detection_delay=detection_delay, anomaly_exclude_metrics=anomaly_exclude_metrics, detection_period=detection_period, - training_period=training_period + training_period=training_period, + exclude_detection_period_from_training=exclude_detection_period_from_training ) }} {% endtest %} From d3a859758c26838d763b1b7d04db28af47a009ac Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 21 Oct 2025 05:54:44 +0000 Subject: [PATCH 02/18] Fix test values for exclude_detection_period_from_training test - Change anomalous data from 500 to 110 rows/day (10% increase instead of 5x) - Change sensitivity from 3 to 10 to demonstrate masking effect - Fix parameter name from anomaly_sensitivity to sensitivity - This ensures test passes when anomaly is included in training and fails when excluded Co-Authored-By: Yosef Arbiv --- integration_tests/tests/test_volume_anomalies.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/integration_tests/tests/test_volume_anomalies.py b/integration_tests/tests/test_volume_anomalies.py index 4a322d731..c3307baee 100644 --- a/integration_tests/tests/test_volume_anomalies.py +++ b/integration_tests/tests/test_volume_anomalies.py @@ -550,7 +550,7 @@ def test_exclude_detection_period_from_training_use_case( Scenario: - 30 days of normal data (100 rows per day) - - 7 days of anomalous data (500 rows per day) in detection period + - 7 days of anomalous data (110 rows per day) in detection period - Without exclusion: anomaly gets included in training baseline, test passes (misses anomaly) - With exclusion: anomaly excluded from training, test fails (detects anomaly) """ @@ -567,14 +567,14 @@ def test_exclude_detection_period_from_training_use_case( ] ) - # Generate 7 days of anomalous data (500 rows per day) - this will be in detection period + # Generate 7 days of anomalous data (110 rows per day) - this will be in detection period anomalous_data = [] for i in range(7): date = utc_now - timedelta(days=7 - i) anomalous_data.extend( [ {TIMESTAMP_COLUMN: date.strftime(DATE_FORMAT)} - for _ in range(500) # 500 rows per day - 5x normal volume + for _ in range(110) # 110 rows per day - 10% increase ] ) @@ -587,7 +587,7 @@ def test_exclude_detection_period_from_training_use_case( "training_period": {"period": "day", "count": 30}, "detection_period": {"period": "day", "count": 7}, "time_bucket": {"period": "day", "count": 1}, - "anomaly_sensitivity": 3, # High sensitivity to catch the 5x spike + "sensitivity": 10, # High sensitivity to demonstrate masking effect # exclude_detection_period_from_training is not set (defaults to False/None) } From 8a409d1f3338908a2a596f9798376775a6be5946 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 21 Oct 2025 08:23:39 +0000 Subject: [PATCH 03/18] Add exclude_detection_period_from_training flag to freshness tests - Add parameter to test_freshness_anomalies.sql and test_event_freshness_anomalies.sql - Add test case for freshness anomalies with exclude_detection_period_from_training flag - Add test case for event freshness anomalies with exclude_detection_period_from_training flag - Tests validate that anomalies are missed when included in training and detected when excluded Co-Authored-By: Yosef Arbiv --- .../dbt_project/package-lock.yml | 7 ++ .../tests/test_event_freshness_anomalies.py | 80 +++++++++++++++++++ .../tests/test_freshness_anomalies.py | 75 +++++++++++++++++ .../tests/test_event_freshness_anomalies.sql | 5 +- macros/edr/tests/test_freshness_anomalies.sql | 5 +- 5 files changed, 168 insertions(+), 4 deletions(-) create mode 100644 integration_tests/dbt_project/package-lock.yml diff --git a/integration_tests/dbt_project/package-lock.yml b/integration_tests/dbt_project/package-lock.yml new file mode 100644 index 000000000..db4aea8a9 --- /dev/null +++ b/integration_tests/dbt_project/package-lock.yml @@ -0,0 +1,7 @@ +packages: + - local: ../../ + name: elementary + - name: dbt_utils + package: dbt-labs/dbt_utils + version: 1.3.0 +sha1_hash: 31ab6d0c0f2c12e5eba02f23dfdb5ad74be70ced diff --git a/integration_tests/tests/test_event_freshness_anomalies.py b/integration_tests/tests/test_event_freshness_anomalies.py index 607c61271..30ea463bb 100644 --- a/integration_tests/tests/test_event_freshness_anomalies.py +++ b/integration_tests/tests/test_event_freshness_anomalies.py @@ -88,3 +88,83 @@ def test_slower_rate_event_freshness(test_id: str, dbt_project: DbtProject): test_vars={"custom_run_started_at": test_started_at.isoformat()}, ) assert result["status"] == "fail" + + +# Anomalies currently not supported on ClickHouse +@pytest.mark.skip_targets(["clickhouse"]) +def test_exclude_detection_period_from_training_event_freshness( + test_id: str, dbt_project: DbtProject +): + """ + Test the exclude_detection_period_from_training flag for event freshness anomalies. + + Scenario: + - 30 days of normal data (event lag of 1 hour) + - 7 days of anomalous data (event lag of 1.1 hours - 10% slower) in detection period + - Without exclusion: anomaly gets included in training baseline, test passes (misses anomaly) + - With exclusion: anomaly excluded from training, test fails (detects anomaly) + """ + now = datetime.now() + + normal_data = [ + { + EVENT_TIMESTAMP_COLUMN: date.strftime(DATE_FORMAT), + UPDATE_TIMESTAMP_COLUMN: (date + timedelta(hours=1)).strftime(DATE_FORMAT), + } + for date in generate_dates( + base_date=now - timedelta(days=37), step=timedelta(hours=2), days_back=30 + ) + ] + + # Generate 7 days of anomalous data (event lag of 1.1 hours - 10% slower) + anomalous_data = [ + { + EVENT_TIMESTAMP_COLUMN: date.strftime(DATE_FORMAT), + UPDATE_TIMESTAMP_COLUMN: (date + timedelta(hours=1, minutes=6)).strftime( + DATE_FORMAT + ), + } + for date in generate_dates( + base_date=now - timedelta(days=7), step=timedelta(hours=2), days_back=7 + ) + ] + + all_data = normal_data + anomalous_data + + # Test 1: WITHOUT exclusion (should pass - misses the anomaly) + test_args_without_exclusion = { + "event_timestamp_column": EVENT_TIMESTAMP_COLUMN, + "update_timestamp_column": UPDATE_TIMESTAMP_COLUMN, + "training_period": {"period": "day", "count": 30}, + "detection_period": {"period": "day", "count": 7}, + "time_bucket": {"period": "day", "count": 1}, + "sensitivity": 10, + } + + test_result_without_exclusion = dbt_project.test( + test_id + "_without_exclusion", + TEST_NAME, + test_args_without_exclusion, + data=all_data, + ) + + assert ( + test_result_without_exclusion["status"] == "pass" + ), "Test should pass when anomaly is included in training" + + # Test 2: WITH exclusion (should fail - detects the anomaly) + test_args_with_exclusion = { + **test_args_without_exclusion, + "exclude_detection_period_from_training": True, + } + + test_result_with_exclusion = dbt_project.test( + test_id + "_with_exclusion", + TEST_NAME, + test_args_with_exclusion, + data=all_data, + ) + + assert ( + test_result_with_exclusion["status"] == "fail" + ), "Test should fail when anomaly is excluded from training" diff --git a/integration_tests/tests/test_freshness_anomalies.py b/integration_tests/tests/test_freshness_anomalies.py index b53f31d3b..c59fda923 100644 --- a/integration_tests/tests/test_freshness_anomalies.py +++ b/integration_tests/tests/test_freshness_anomalies.py @@ -233,3 +233,78 @@ def test_first_metric_null(test_id, dbt_project: DbtProject): materialization="incremental", ) assert result["status"] == "pass" + + +# Anomalies currently not supported on ClickHouse +@pytest.mark.skip_targets(["clickhouse"]) +def test_exclude_detection_period_from_training_freshness( + test_id: str, dbt_project: DbtProject +): + """ + Test the exclude_detection_period_from_training flag for freshness anomalies. + + Scenario: + - 30 days of normal data (updates every 10 minutes) + - 7 days of anomalous data (updates every 11 minutes - 10% slower) in detection period + - Without exclusion: anomaly gets included in training baseline, test passes (misses anomaly) + - With exclusion: anomaly excluded from training, test fails (detects anomaly) + """ + now = datetime.now() + + normal_data = [ + {TIMESTAMP_COLUMN: date.strftime(DATE_FORMAT)} + for date in generate_dates( + base_date=now - timedelta(days=37), + step=timedelta(minutes=10), + days_back=30, + ) + ] + + # Generate 7 days of anomalous data (updates every 11 minutes - 10% slower) + anomalous_data = [ + {TIMESTAMP_COLUMN: date.strftime(DATE_FORMAT)} + for date in generate_dates( + base_date=now - timedelta(days=7), + step=timedelta(minutes=11), + days_back=7, + ) + ] + + all_data = normal_data + anomalous_data + + # Test 1: WITHOUT exclusion (should pass - misses the anomaly) + test_args_without_exclusion = { + "timestamp_column": TIMESTAMP_COLUMN, + "training_period": {"period": "day", "count": 30}, + "detection_period": {"period": "day", "count": 7}, + "time_bucket": {"period": "day", "count": 1}, + "sensitivity": 10, + } + + test_result_without_exclusion = dbt_project.test( + test_id + "_without_exclusion", + TEST_NAME, + test_args_without_exclusion, + data=all_data, + ) + + assert ( + test_result_without_exclusion["status"] == "pass" + ), "Test should pass when anomaly is included in training" + + # Test 2: WITH exclusion (should fail - detects the anomaly) + test_args_with_exclusion = { + **test_args_without_exclusion, + "exclude_detection_period_from_training": True, + } + + test_result_with_exclusion = dbt_project.test( + test_id + "_with_exclusion", + TEST_NAME, + test_args_with_exclusion, + data=all_data, + ) + + assert ( + test_result_with_exclusion["status"] == "fail" + ), "Test should fail when anomaly is excluded from training" diff --git a/macros/edr/tests/test_event_freshness_anomalies.sql b/macros/edr/tests/test_event_freshness_anomalies.sql index 0f6b1af05..694e9cd6c 100644 --- a/macros/edr/tests/test_event_freshness_anomalies.sql +++ b/macros/edr/tests/test_event_freshness_anomalies.sql @@ -1,4 +1,4 @@ -{% test event_freshness_anomalies(model, event_timestamp_column, update_timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, ignore_small_changes, detection_delay, anomaly_exclude_metrics, detection_period, training_period) %} +{% test event_freshness_anomalies(model, event_timestamp_column, update_timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, ignore_small_changes, detection_delay, anomaly_exclude_metrics, detection_period, training_period, exclude_detection_period_from_training) %} {{ config(tags = ['elementary-tests']) }} {% if execute and elementary.is_test_command() and elementary.is_elementary_enabled() %} {% set model_relation = elementary.get_model_relation_for_test(model, elementary.get_test_model()) %} @@ -32,7 +32,8 @@ detection_delay=detection_delay, anomaly_exclude_metrics=anomaly_exclude_metrics, detection_period=detection_period, - training_period=training_period + training_period=training_period, + exclude_detection_period_from_training=exclude_detection_period_from_training ) }} {% endtest %} diff --git a/macros/edr/tests/test_freshness_anomalies.sql b/macros/edr/tests/test_freshness_anomalies.sql index abba9b4fc..9e3139e18 100644 --- a/macros/edr/tests/test_freshness_anomalies.sql +++ b/macros/edr/tests/test_freshness_anomalies.sql @@ -1,4 +1,4 @@ -{% test freshness_anomalies(model, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, ignore_small_changes, detection_delay, anomaly_exclude_metrics, detection_period, training_period) %} +{% test freshness_anomalies(model, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, ignore_small_changes, detection_delay, anomaly_exclude_metrics, detection_period, training_period, exclude_detection_period_from_training) %} {{ config(tags = ['elementary-tests']) }} {{ elementary.test_table_anomalies( model=model, @@ -18,7 +18,8 @@ detection_delay=detection_delay, anomaly_exclude_metrics=anomaly_exclude_metrics, detection_period=detection_period, - training_period=training_period + training_period=training_period, + exclude_detection_period_from_training=exclude_detection_period_from_training ) }} {% endtest %} From 50f9e61bc551162557988cbf660818c8c786b9cc Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 21 Oct 2025 08:45:47 +0000 Subject: [PATCH 04/18] Adjust test values to make anomalies more detectable - Change volume anomalies from 110 to 150 rows/day (50% increase) - Change freshness anomalies from 11 to 15 minutes (50% slower) - Change event freshness anomalies from 1.1 to 1.5 hours lag (50% slower) - Reduce sensitivity from 10 to 3 for all tests - This ensures anomalies are detected when excluded from training Co-Authored-By: Yosef Arbiv --- integration_tests/tests/test_event_freshness_anomalies.py | 8 ++++---- integration_tests/tests/test_freshness_anomalies.py | 8 ++++---- integration_tests/tests/test_volume_anomalies.py | 8 ++++---- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/integration_tests/tests/test_event_freshness_anomalies.py b/integration_tests/tests/test_event_freshness_anomalies.py index 30ea463bb..1b99792db 100644 --- a/integration_tests/tests/test_event_freshness_anomalies.py +++ b/integration_tests/tests/test_event_freshness_anomalies.py @@ -100,7 +100,7 @@ def test_exclude_detection_period_from_training_event_freshness( Scenario: - 30 days of normal data (event lag of 1 hour) - - 7 days of anomalous data (event lag of 1.1 hours - 10% slower) in detection period + - 7 days of anomalous data (event lag of 1.5 hours - 50% slower) in detection period - Without exclusion: anomaly gets included in training baseline, test passes (misses anomaly) - With exclusion: anomaly excluded from training, test fails (detects anomaly) """ @@ -116,11 +116,11 @@ def test_exclude_detection_period_from_training_event_freshness( ) ] - # Generate 7 days of anomalous data (event lag of 1.1 hours - 10% slower) + # Generate 7 days of anomalous data (event lag of 1.5 hours - 50% slower) anomalous_data = [ { EVENT_TIMESTAMP_COLUMN: date.strftime(DATE_FORMAT), - UPDATE_TIMESTAMP_COLUMN: (date + timedelta(hours=1, minutes=6)).strftime( + UPDATE_TIMESTAMP_COLUMN: (date + timedelta(hours=1, minutes=30)).strftime( DATE_FORMAT ), } @@ -138,7 +138,7 @@ def test_exclude_detection_period_from_training_event_freshness( "training_period": {"period": "day", "count": 30}, "detection_period": {"period": "day", "count": 7}, "time_bucket": {"period": "day", "count": 1}, - "sensitivity": 10, + "sensitivity": 3, } test_result_without_exclusion = dbt_project.test( diff --git a/integration_tests/tests/test_freshness_anomalies.py b/integration_tests/tests/test_freshness_anomalies.py index c59fda923..2202e0d76 100644 --- a/integration_tests/tests/test_freshness_anomalies.py +++ b/integration_tests/tests/test_freshness_anomalies.py @@ -245,7 +245,7 @@ def test_exclude_detection_period_from_training_freshness( Scenario: - 30 days of normal data (updates every 10 minutes) - - 7 days of anomalous data (updates every 11 minutes - 10% slower) in detection period + - 7 days of anomalous data (updates every 15 minutes - 50% slower) in detection period - Without exclusion: anomaly gets included in training baseline, test passes (misses anomaly) - With exclusion: anomaly excluded from training, test fails (detects anomaly) """ @@ -260,12 +260,12 @@ def test_exclude_detection_period_from_training_freshness( ) ] - # Generate 7 days of anomalous data (updates every 11 minutes - 10% slower) + # Generate 7 days of anomalous data (updates every 15 minutes - 50% slower) anomalous_data = [ {TIMESTAMP_COLUMN: date.strftime(DATE_FORMAT)} for date in generate_dates( base_date=now - timedelta(days=7), - step=timedelta(minutes=11), + step=timedelta(minutes=15), days_back=7, ) ] @@ -278,7 +278,7 @@ def test_exclude_detection_period_from_training_freshness( "training_period": {"period": "day", "count": 30}, "detection_period": {"period": "day", "count": 7}, "time_bucket": {"period": "day", "count": 1}, - "sensitivity": 10, + "sensitivity": 3, } test_result_without_exclusion = dbt_project.test( diff --git a/integration_tests/tests/test_volume_anomalies.py b/integration_tests/tests/test_volume_anomalies.py index c3307baee..0acc8e3c4 100644 --- a/integration_tests/tests/test_volume_anomalies.py +++ b/integration_tests/tests/test_volume_anomalies.py @@ -550,7 +550,7 @@ def test_exclude_detection_period_from_training_use_case( Scenario: - 30 days of normal data (100 rows per day) - - 7 days of anomalous data (110 rows per day) in detection period + - 7 days of anomalous data (150 rows per day) in detection period - Without exclusion: anomaly gets included in training baseline, test passes (misses anomaly) - With exclusion: anomaly excluded from training, test fails (detects anomaly) """ @@ -567,14 +567,14 @@ def test_exclude_detection_period_from_training_use_case( ] ) - # Generate 7 days of anomalous data (110 rows per day) - this will be in detection period + # Generate 7 days of anomalous data (150 rows per day) - this will be in detection period anomalous_data = [] for i in range(7): date = utc_now - timedelta(days=7 - i) anomalous_data.extend( [ {TIMESTAMP_COLUMN: date.strftime(DATE_FORMAT)} - for _ in range(110) # 110 rows per day - 10% increase + for _ in range(150) # 150 rows per day - 50% increase ] ) @@ -587,7 +587,7 @@ def test_exclude_detection_period_from_training_use_case( "training_period": {"period": "day", "count": 30}, "detection_period": {"period": "day", "count": 7}, "time_bucket": {"period": "day", "count": 1}, - "sensitivity": 10, # High sensitivity to demonstrate masking effect + "sensitivity": 3, # Standard sensitivity # exclude_detection_period_from_training is not set (defaults to False/None) } From 2805b510533304160b81c527b8d4ea258fdc5992 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 21 Oct 2025 11:19:39 +0000 Subject: [PATCH 05/18] Fix test failures: add exclude_detection_period_from_training to expected config and fix date overlap - Add exclude_detection_period_from_training: None to test_anomaly_test_configuration expected config - Fix date overlap in test_exclude_detection_period_from_training_use_case (days 37-8 for training, days 7-1 for detection) Co-Authored-By: Yosef Arbiv --- integration_tests/tests/test_anomaly_test_configuration.py | 1 + integration_tests/tests/test_volume_anomalies.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/integration_tests/tests/test_anomaly_test_configuration.py b/integration_tests/tests/test_anomaly_test_configuration.py index 10cc72688..b91507fde 100644 --- a/integration_tests/tests/test_anomaly_test_configuration.py +++ b/integration_tests/tests/test_anomaly_test_configuration.py @@ -88,6 +88,7 @@ def get_value(key: str): "freshness_column": None, # Deprecated "dimensions": None, # should only be set at the test level, "exclude_final_results": get_value("exclude_final_results"), + "exclude_detection_period_from_training": None, } diff --git a/integration_tests/tests/test_volume_anomalies.py b/integration_tests/tests/test_volume_anomalies.py index 0acc8e3c4..8a4e63e6f 100644 --- a/integration_tests/tests/test_volume_anomalies.py +++ b/integration_tests/tests/test_volume_anomalies.py @@ -559,7 +559,7 @@ def test_exclude_detection_period_from_training_use_case( # Generate 30 days of normal data (100 rows per day) normal_data = [] for i in range(30): - date = utc_now - timedelta(days=30 - i) + date = utc_now - timedelta(days=37 - i) # Days 37 to 8 ago normal_data.extend( [ {TIMESTAMP_COLUMN: date.strftime(DATE_FORMAT)} @@ -570,7 +570,7 @@ def test_exclude_detection_period_from_training_use_case( # Generate 7 days of anomalous data (150 rows per day) - this will be in detection period anomalous_data = [] for i in range(7): - date = utc_now - timedelta(days=7 - i) + date = utc_now - timedelta(days=7 - i) # Days 7 to 1 ago anomalous_data.extend( [ {TIMESTAMP_COLUMN: date.strftime(DATE_FORMAT)} From 490de3072ad8f5bd35a89f66364bde1bea9350fa Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 21 Oct 2025 12:40:06 +0000 Subject: [PATCH 06/18] Fix exclude_detection_period_from_training implementation The previous implementation incorrectly filtered out detection period rows completely, preventing them from being scored. This fix uses CASE statements in the window functions to exclude detection period rows from training calculations while still including them in the final output for scoring. Changes: - Modified time_window_aggregation CTE to use CASE statements in AVG, STDDEV, COUNT, LAST_VALUE, and FIRST_VALUE window functions - Removed the 'and not is_detection_period' filter that was excluding rows - Added is_detection_period to SELECT and GROUP BY clauses This allows the flag to work as intended: - WITHOUT flag: detection period included in training, anomaly absorbed - WITH flag: detection period excluded from training, anomaly detected Co-Authored-By: Yosef Arbiv --- .../tests/test_event_freshness_anomalies.py | 4 +--- .../tests/test_freshness_anomalies.py | 8 +++----- .../tests/test_volume_anomalies.py | 20 +++++++++---------- .../get_anomaly_scores_query.sql | 14 ++++++------- 4 files changed, 21 insertions(+), 25 deletions(-) diff --git a/integration_tests/tests/test_event_freshness_anomalies.py b/integration_tests/tests/test_event_freshness_anomalies.py index 1b99792db..c0b1cf3f3 100644 --- a/integration_tests/tests/test_event_freshness_anomalies.py +++ b/integration_tests/tests/test_event_freshness_anomalies.py @@ -92,9 +92,7 @@ def test_slower_rate_event_freshness(test_id: str, dbt_project: DbtProject): # Anomalies currently not supported on ClickHouse @pytest.mark.skip_targets(["clickhouse"]) -def test_exclude_detection_period_from_training_event_freshness( - test_id: str, dbt_project: DbtProject -): +def test_exclude_detection_event_freshness(test_id: str, dbt_project: DbtProject): """ Test the exclude_detection_period_from_training flag for event freshness anomalies. diff --git a/integration_tests/tests/test_freshness_anomalies.py b/integration_tests/tests/test_freshness_anomalies.py index 2202e0d76..bc023c4ed 100644 --- a/integration_tests/tests/test_freshness_anomalies.py +++ b/integration_tests/tests/test_freshness_anomalies.py @@ -237,9 +237,7 @@ def test_first_metric_null(test_id, dbt_project: DbtProject): # Anomalies currently not supported on ClickHouse @pytest.mark.skip_targets(["clickhouse"]) -def test_exclude_detection_period_from_training_freshness( - test_id: str, dbt_project: DbtProject -): +def test_exclude_detection_freshness(test_id: str, dbt_project: DbtProject): """ Test the exclude_detection_period_from_training flag for freshness anomalies. @@ -260,12 +258,12 @@ def test_exclude_detection_period_from_training_freshness( ) ] - # Generate 7 days of anomalous data (updates every 15 minutes - 50% slower) + # Generate 7 days of anomalous data (updates every 30 minutes - 200% slower) anomalous_data = [ {TIMESTAMP_COLUMN: date.strftime(DATE_FORMAT)} for date in generate_dates( base_date=now - timedelta(days=7), - step=timedelta(minutes=15), + step=timedelta(minutes=30), days_back=7, ) ] diff --git a/integration_tests/tests/test_volume_anomalies.py b/integration_tests/tests/test_volume_anomalies.py index 8a4e63e6f..5b8b29d70 100644 --- a/integration_tests/tests/test_volume_anomalies.py +++ b/integration_tests/tests/test_volume_anomalies.py @@ -542,39 +542,39 @@ def test_anomalyless_vol_anomalies_with_test_materialization( # 2. With exclude_detection_period_from_training=False: anomaly is missed (test passes) because training includes the anomaly # 3. With exclude_detection_period_from_training=True: anomaly is detected (test fails) because training excludes the anomaly @pytest.mark.skip_targets(["clickhouse"]) -def test_exclude_detection_period_from_training_use_case( - test_id: str, dbt_project: DbtProject -): +def test_exclude_detection_from_training(test_id: str, dbt_project: DbtProject): """ Test the exclude_detection_period_from_training flag functionality. Scenario: - - 30 days of normal data (100 rows per day) - - 7 days of anomalous data (150 rows per day) in detection period + - 30 days of normal data with variance (98, 100, 102 rows per day pattern) + - 7 days of anomalous data (114 rows per day) in detection period - Without exclusion: anomaly gets included in training baseline, test passes (misses anomaly) - With exclusion: anomaly excluded from training, test fails (detects anomaly) """ utc_now = datetime.utcnow() - # Generate 30 days of normal data (100 rows per day) + # Generate 30 days of normal data with variance (98, 100, 102 pattern) + normal_pattern = [98, 100, 102] normal_data = [] for i in range(30): date = utc_now - timedelta(days=37 - i) # Days 37 to 8 ago + rows_per_day = normal_pattern[i % 3] # Cycle through 98, 100, 102 normal_data.extend( [ {TIMESTAMP_COLUMN: date.strftime(DATE_FORMAT)} - for _ in range(100) # 100 rows per day + for _ in range(rows_per_day) ] ) - # Generate 7 days of anomalous data (150 rows per day) - this will be in detection period + # Generate 7 days of anomalous data (114 rows per day) - this will be in detection period anomalous_data = [] for i in range(7): date = utc_now - timedelta(days=7 - i) # Days 7 to 1 ago anomalous_data.extend( [ {TIMESTAMP_COLUMN: date.strftime(DATE_FORMAT)} - for _ in range(150) # 150 rows per day - 50% increase + for _ in range(114) # 114 rows per day - 14% increase from mean ] ) @@ -587,7 +587,7 @@ def test_exclude_detection_period_from_training_use_case( "training_period": {"period": "day", "count": 30}, "detection_period": {"period": "day", "count": 7}, "time_bucket": {"period": "day", "count": 1}, - "sensitivity": 3, # Standard sensitivity + "sensitivity": 5, # Higher sensitivity to allow anomaly to be absorbed # exclude_detection_period_from_training is not set (defaults to False/None) } diff --git a/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql b/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql index 3cc126485..f98c0af4d 100644 --- a/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql +++ b/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql @@ -176,15 +176,15 @@ bucket_seasonality, bucket_duration_hours, updated_at, - avg(metric_value) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_avg, - {{ elementary.standard_deviation('metric_value') }} over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_stddev, - count(metric_value) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_set_size, - last_value(bucket_end) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) training_end, - first_value(bucket_end) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_start + is_detection_period, + avg(case when not is_detection_period then metric_value end) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_avg, + {{ elementary.standard_deviation('case when not is_detection_period then metric_value end') }} over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_stddev, + count(case when not is_detection_period then metric_value end) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_set_size, + last_value(case when not is_detection_period then bucket_end end) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) training_end, + first_value(case when not is_detection_period then bucket_end end) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_start from grouped_metrics where not is_excluded - and not is_detection_period {# NEW: Exclude detection period from training #} - {{ dbt_utils.group_by(13) }} + {{ dbt_utils.group_by(14) }} ), anomaly_scores as ( From 642f0157a344860d7e7e01e2f6df491d271057b2 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 21 Oct 2025 12:57:53 +0000 Subject: [PATCH 07/18] Remove freshness anomalies support from this PR Keep only volume anomalies support for exclude_detection_period_from_training flag. Freshness anomalies will be addressed in a separate PR. - Revert changes to test_freshness_anomalies.sql - Revert changes to test_event_freshness_anomalies.sql - Remove test_exclude_detection_freshness test - Remove test_exclude_detection_event_freshness test Co-Authored-By: Yosef Arbiv --- macros/edr/tests/test_event_freshness_anomalies.sql | 5 ++--- macros/edr/tests/test_freshness_anomalies.sql | 5 ++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/macros/edr/tests/test_event_freshness_anomalies.sql b/macros/edr/tests/test_event_freshness_anomalies.sql index 694e9cd6c..0f6b1af05 100644 --- a/macros/edr/tests/test_event_freshness_anomalies.sql +++ b/macros/edr/tests/test_event_freshness_anomalies.sql @@ -1,4 +1,4 @@ -{% test event_freshness_anomalies(model, event_timestamp_column, update_timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, ignore_small_changes, detection_delay, anomaly_exclude_metrics, detection_period, training_period, exclude_detection_period_from_training) %} +{% test event_freshness_anomalies(model, event_timestamp_column, update_timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, ignore_small_changes, detection_delay, anomaly_exclude_metrics, detection_period, training_period) %} {{ config(tags = ['elementary-tests']) }} {% if execute and elementary.is_test_command() and elementary.is_elementary_enabled() %} {% set model_relation = elementary.get_model_relation_for_test(model, elementary.get_test_model()) %} @@ -32,8 +32,7 @@ detection_delay=detection_delay, anomaly_exclude_metrics=anomaly_exclude_metrics, detection_period=detection_period, - training_period=training_period, - exclude_detection_period_from_training=exclude_detection_period_from_training + training_period=training_period ) }} {% endtest %} diff --git a/macros/edr/tests/test_freshness_anomalies.sql b/macros/edr/tests/test_freshness_anomalies.sql index 9e3139e18..abba9b4fc 100644 --- a/macros/edr/tests/test_freshness_anomalies.sql +++ b/macros/edr/tests/test_freshness_anomalies.sql @@ -1,4 +1,4 @@ -{% test freshness_anomalies(model, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, ignore_small_changes, detection_delay, anomaly_exclude_metrics, detection_period, training_period, exclude_detection_period_from_training) %} +{% test freshness_anomalies(model, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, ignore_small_changes, detection_delay, anomaly_exclude_metrics, detection_period, training_period) %} {{ config(tags = ['elementary-tests']) }} {{ elementary.test_table_anomalies( model=model, @@ -18,8 +18,7 @@ detection_delay=detection_delay, anomaly_exclude_metrics=anomaly_exclude_metrics, detection_period=detection_period, - training_period=training_period, - exclude_detection_period_from_training=exclude_detection_period_from_training + training_period=training_period ) }} {% endtest %} From 3b9ae55d3f72a4fa7cda240275f6f422863acc00 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 21 Oct 2025 13:01:30 +0000 Subject: [PATCH 08/18] Remove freshness test functions from test files Remove test_exclude_detection_freshness and test_exclude_detection_event_freshness test functions that were added for freshness anomalies support. Co-Authored-By: Yosef Arbiv --- .../tests/test_event_freshness_anomalies.py | 78 ------------------- .../tests/test_freshness_anomalies.py | 73 ----------------- 2 files changed, 151 deletions(-) diff --git a/integration_tests/tests/test_event_freshness_anomalies.py b/integration_tests/tests/test_event_freshness_anomalies.py index c0b1cf3f3..607c61271 100644 --- a/integration_tests/tests/test_event_freshness_anomalies.py +++ b/integration_tests/tests/test_event_freshness_anomalies.py @@ -88,81 +88,3 @@ def test_slower_rate_event_freshness(test_id: str, dbt_project: DbtProject): test_vars={"custom_run_started_at": test_started_at.isoformat()}, ) assert result["status"] == "fail" - - -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) -def test_exclude_detection_event_freshness(test_id: str, dbt_project: DbtProject): - """ - Test the exclude_detection_period_from_training flag for event freshness anomalies. - - Scenario: - - 30 days of normal data (event lag of 1 hour) - - 7 days of anomalous data (event lag of 1.5 hours - 50% slower) in detection period - - Without exclusion: anomaly gets included in training baseline, test passes (misses anomaly) - - With exclusion: anomaly excluded from training, test fails (detects anomaly) - """ - now = datetime.now() - - normal_data = [ - { - EVENT_TIMESTAMP_COLUMN: date.strftime(DATE_FORMAT), - UPDATE_TIMESTAMP_COLUMN: (date + timedelta(hours=1)).strftime(DATE_FORMAT), - } - for date in generate_dates( - base_date=now - timedelta(days=37), step=timedelta(hours=2), days_back=30 - ) - ] - - # Generate 7 days of anomalous data (event lag of 1.5 hours - 50% slower) - anomalous_data = [ - { - EVENT_TIMESTAMP_COLUMN: date.strftime(DATE_FORMAT), - UPDATE_TIMESTAMP_COLUMN: (date + timedelta(hours=1, minutes=30)).strftime( - DATE_FORMAT - ), - } - for date in generate_dates( - base_date=now - timedelta(days=7), step=timedelta(hours=2), days_back=7 - ) - ] - - all_data = normal_data + anomalous_data - - # Test 1: WITHOUT exclusion (should pass - misses the anomaly) - test_args_without_exclusion = { - "event_timestamp_column": EVENT_TIMESTAMP_COLUMN, - "update_timestamp_column": UPDATE_TIMESTAMP_COLUMN, - "training_period": {"period": "day", "count": 30}, - "detection_period": {"period": "day", "count": 7}, - "time_bucket": {"period": "day", "count": 1}, - "sensitivity": 3, - } - - test_result_without_exclusion = dbt_project.test( - test_id + "_without_exclusion", - TEST_NAME, - test_args_without_exclusion, - data=all_data, - ) - - assert ( - test_result_without_exclusion["status"] == "pass" - ), "Test should pass when anomaly is included in training" - - # Test 2: WITH exclusion (should fail - detects the anomaly) - test_args_with_exclusion = { - **test_args_without_exclusion, - "exclude_detection_period_from_training": True, - } - - test_result_with_exclusion = dbt_project.test( - test_id + "_with_exclusion", - TEST_NAME, - test_args_with_exclusion, - data=all_data, - ) - - assert ( - test_result_with_exclusion["status"] == "fail" - ), "Test should fail when anomaly is excluded from training" diff --git a/integration_tests/tests/test_freshness_anomalies.py b/integration_tests/tests/test_freshness_anomalies.py index bc023c4ed..b53f31d3b 100644 --- a/integration_tests/tests/test_freshness_anomalies.py +++ b/integration_tests/tests/test_freshness_anomalies.py @@ -233,76 +233,3 @@ def test_first_metric_null(test_id, dbt_project: DbtProject): materialization="incremental", ) assert result["status"] == "pass" - - -# Anomalies currently not supported on ClickHouse -@pytest.mark.skip_targets(["clickhouse"]) -def test_exclude_detection_freshness(test_id: str, dbt_project: DbtProject): - """ - Test the exclude_detection_period_from_training flag for freshness anomalies. - - Scenario: - - 30 days of normal data (updates every 10 minutes) - - 7 days of anomalous data (updates every 15 minutes - 50% slower) in detection period - - Without exclusion: anomaly gets included in training baseline, test passes (misses anomaly) - - With exclusion: anomaly excluded from training, test fails (detects anomaly) - """ - now = datetime.now() - - normal_data = [ - {TIMESTAMP_COLUMN: date.strftime(DATE_FORMAT)} - for date in generate_dates( - base_date=now - timedelta(days=37), - step=timedelta(minutes=10), - days_back=30, - ) - ] - - # Generate 7 days of anomalous data (updates every 30 minutes - 200% slower) - anomalous_data = [ - {TIMESTAMP_COLUMN: date.strftime(DATE_FORMAT)} - for date in generate_dates( - base_date=now - timedelta(days=7), - step=timedelta(minutes=30), - days_back=7, - ) - ] - - all_data = normal_data + anomalous_data - - # Test 1: WITHOUT exclusion (should pass - misses the anomaly) - test_args_without_exclusion = { - "timestamp_column": TIMESTAMP_COLUMN, - "training_period": {"period": "day", "count": 30}, - "detection_period": {"period": "day", "count": 7}, - "time_bucket": {"period": "day", "count": 1}, - "sensitivity": 3, - } - - test_result_without_exclusion = dbt_project.test( - test_id + "_without_exclusion", - TEST_NAME, - test_args_without_exclusion, - data=all_data, - ) - - assert ( - test_result_without_exclusion["status"] == "pass" - ), "Test should pass when anomaly is included in training" - - # Test 2: WITH exclusion (should fail - detects the anomaly) - test_args_with_exclusion = { - **test_args_without_exclusion, - "exclude_detection_period_from_training": True, - } - - test_result_with_exclusion = dbt_project.test( - test_id + "_with_exclusion", - TEST_NAME, - test_args_with_exclusion, - data=all_data, - ) - - assert ( - test_result_with_exclusion["status"] == "fail" - ), "Test should fail when anomaly is excluded from training" From 40a5f36c1507e184ca88e01fb869a6cc4f0c2961 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 22 Oct 2025 09:07:30 +0000 Subject: [PATCH 09/18] Remove package-lock.yml from PR Co-Authored-By: Yosef Arbiv --- integration_tests/dbt_project/package-lock.yml | 7 ------- 1 file changed, 7 deletions(-) delete mode 100644 integration_tests/dbt_project/package-lock.yml diff --git a/integration_tests/dbt_project/package-lock.yml b/integration_tests/dbt_project/package-lock.yml deleted file mode 100644 index db4aea8a9..000000000 --- a/integration_tests/dbt_project/package-lock.yml +++ /dev/null @@ -1,7 +0,0 @@ -packages: - - local: ../../ - name: elementary - - name: dbt_utils - package: dbt-labs/dbt_utils - version: 1.3.0 -sha1_hash: 31ab6d0c0f2c12e5eba02f23dfdb5ad74be70ced From 8b5791d3c0d79e2c182e6bfb2a56713114bcad37 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 22 Oct 2025 09:28:00 +0000 Subject: [PATCH 10/18] Remove 'NEW:' references and clean up obvious comments Co-Authored-By: Yosef Arbiv --- integration_tests/tests/test_volume_anomalies.py | 11 +++++------ .../anomaly_detection/get_anomaly_scores_query.sql | 4 ++-- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/integration_tests/tests/test_volume_anomalies.py b/integration_tests/tests/test_volume_anomalies.py index 5b8b29d70..10015d038 100644 --- a/integration_tests/tests/test_volume_anomalies.py +++ b/integration_tests/tests/test_volume_anomalies.py @@ -558,8 +558,8 @@ def test_exclude_detection_from_training(test_id: str, dbt_project: DbtProject): normal_pattern = [98, 100, 102] normal_data = [] for i in range(30): - date = utc_now - timedelta(days=37 - i) # Days 37 to 8 ago - rows_per_day = normal_pattern[i % 3] # Cycle through 98, 100, 102 + date = utc_now - timedelta(days=37 - i) + rows_per_day = normal_pattern[i % 3] normal_data.extend( [ {TIMESTAMP_COLUMN: date.strftime(DATE_FORMAT)} @@ -570,15 +570,14 @@ def test_exclude_detection_from_training(test_id: str, dbt_project: DbtProject): # Generate 7 days of anomalous data (114 rows per day) - this will be in detection period anomalous_data = [] for i in range(7): - date = utc_now - timedelta(days=7 - i) # Days 7 to 1 ago + date = utc_now - timedelta(days=7 - i) anomalous_data.extend( [ {TIMESTAMP_COLUMN: date.strftime(DATE_FORMAT)} - for _ in range(114) # 114 rows per day - 14% increase from mean + for _ in range(114) # 14% increase from mean ] ) - # Combine all data all_data = normal_data + anomalous_data # Test 1: WITHOUT exclusion (should pass - misses the anomaly because it's included in training) @@ -606,7 +605,7 @@ def test_exclude_detection_from_training(test_id: str, dbt_project: DbtProject): # Test 2: WITH exclusion (should fail - detects the anomaly because it's excluded from training) test_args_with_exclusion = { **test_args_without_exclusion, - "exclude_detection_period_from_training": True, # NEW FLAG + "exclude_detection_period_from_training": True, } test_result_with_exclusion = dbt_project.test( diff --git a/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql b/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql index f98c0af4d..5d01673cd 100644 --- a/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql +++ b/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql @@ -31,7 +31,7 @@ {%- set detection_end_expr = elementary.edr_cast_as_timestamp(elementary.edr_datetime_to_sql(detection_end)) %} {%- set min_bucket_start_expr = elementary.get_trunc_min_bucket_start_expr(detection_end, metric_properties, test_configuration.days_back) %} - {# NEW: Calculate detection period start for exclusion logic #} + {# Calculate detection period start for exclusion logic #} {%- if test_configuration.exclude_detection_period_from_training %} {%- set detection_period_start = (detection_end - modules.datetime.timedelta(days=test_configuration.backfill_days)) %} {%- set detection_period_start_expr = elementary.edr_cast_as_timestamp(elementary.edr_datetime_to_sql(detection_period_start)) %} @@ -148,7 +148,7 @@ bucket_end, {{ bucket_seasonality_expr }} as bucket_seasonality, {{ test_configuration.anomaly_exclude_metrics or 'FALSE' }} as is_excluded, - {# NEW: Flag detection period metrics for exclusion from training #} + {# Flag detection period metrics for exclusion from training #} {% if test_configuration.exclude_detection_period_from_training %} bucket_end > {{ detection_period_start_expr }} {% else %} From 57e7b7c19e2dbeefe95792aa901a8d3492b094fb Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 23 Oct 2025 15:39:13 +0000 Subject: [PATCH 11/18] Address PR review comments: rename column, add validation comment, set default to false Co-Authored-By: Yosef Arbiv --- .../get_anomaly_scores_query.sql | 19 +++++++++++-------- macros/edr/tests/test_table_anomalies.sql | 2 +- macros/edr/tests/test_volume_anomalies.sql | 2 +- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql b/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql index 5d01673cd..6efe84efb 100644 --- a/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql +++ b/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql @@ -31,7 +31,10 @@ {%- set detection_end_expr = elementary.edr_cast_as_timestamp(elementary.edr_datetime_to_sql(detection_end)) %} {%- set min_bucket_start_expr = elementary.get_trunc_min_bucket_start_expr(detection_end, metric_properties, test_configuration.days_back) %} - {# Calculate detection period start for exclusion logic #} + {# Calculate detection period start for exclusion logic. + The detection period spans from (detection_end - backfill_days) to detection_end. + This ensures we exclude the most recent backfill_days worth of data from training, + which are the metrics being actively tested for anomalies. #} {%- if test_configuration.exclude_detection_period_from_training %} {%- set detection_period_start = (detection_end - modules.datetime.timedelta(days=test_configuration.backfill_days)) %} {%- set detection_period_start_expr = elementary.edr_cast_as_timestamp(elementary.edr_datetime_to_sql(detection_period_start)) %} @@ -153,7 +156,7 @@ bucket_end > {{ detection_period_start_expr }} {% else %} FALSE - {% endif %} as is_detection_period, + {% endif %} as should_exclude_from_training, bucket_duration_hours, updated_at from grouped_metrics_duplicates @@ -176,12 +179,12 @@ bucket_seasonality, bucket_duration_hours, updated_at, - is_detection_period, - avg(case when not is_detection_period then metric_value end) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_avg, - {{ elementary.standard_deviation('case when not is_detection_period then metric_value end') }} over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_stddev, - count(case when not is_detection_period then metric_value end) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_set_size, - last_value(case when not is_detection_period then bucket_end end) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) training_end, - first_value(case when not is_detection_period then bucket_end end) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_start + should_exclude_from_training, + avg(case when not should_exclude_from_training then metric_value end) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_avg, + {{ elementary.standard_deviation('case when not should_exclude_from_training then metric_value end') }} over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_stddev, + count(case when not should_exclude_from_training then metric_value end) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_set_size, + last_value(case when not should_exclude_from_training then bucket_end end) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) training_end, + first_value(case when not should_exclude_from_training then bucket_end end) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_start from grouped_metrics where not is_excluded {{ dbt_utils.group_by(14) }} diff --git a/macros/edr/tests/test_table_anomalies.sql b/macros/edr/tests/test_table_anomalies.sql index db7e41fe5..b49d450a8 100644 --- a/macros/edr/tests/test_table_anomalies.sql +++ b/macros/edr/tests/test_table_anomalies.sql @@ -1,4 +1,4 @@ -{% test table_anomalies(model, table_anomalies, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, mandatory_params=none, event_timestamp_column=none, freshness_column=none, sensitivity=none, ignore_small_changes={"spike_failure_percent_threshold": none, "drop_failure_percent_threshold": none}, fail_on_zero=false, detection_delay=none, anomaly_exclude_metrics=none, detection_period=none, training_period=none, exclude_detection_period_from_training=none) %} +{% test table_anomalies(model, table_anomalies, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, mandatory_params=none, event_timestamp_column=none, freshness_column=none, sensitivity=none, ignore_small_changes={"spike_failure_percent_threshold": none, "drop_failure_percent_threshold": none}, fail_on_zero=false, detection_delay=none, anomaly_exclude_metrics=none, detection_period=none, training_period=none, exclude_detection_period_from_training=false) %} {{ config(tags = ['elementary-tests']) }} {%- if execute and elementary.is_test_command() and elementary.is_elementary_enabled() %} {% set model_relation = elementary.get_model_relation_for_test(model, elementary.get_test_model()) %} diff --git a/macros/edr/tests/test_volume_anomalies.sql b/macros/edr/tests/test_volume_anomalies.sql index b96bda4d4..53e175d92 100644 --- a/macros/edr/tests/test_volume_anomalies.sql +++ b/macros/edr/tests/test_volume_anomalies.sql @@ -1,4 +1,4 @@ -{% test volume_anomalies(model, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, ignore_small_changes, fail_on_zero, detection_delay, anomaly_exclude_metrics, detection_period, training_period, exclude_detection_period_from_training) %} +{% test volume_anomalies(model, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, ignore_small_changes, fail_on_zero, detection_delay, anomaly_exclude_metrics, detection_period, training_period, exclude_detection_period_from_training=false) %} {{ config(tags = ['elementary-tests']) }} {{ elementary.test_table_anomalies( From 9b9f4350f3de27b01781b2a5d1735236fce24442 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sun, 26 Oct 2025 12:23:57 +0000 Subject: [PATCH 12/18] Improve comment: clarify backfill_days definition and configuration Co-Authored-By: Yosef Arbiv --- .../anomaly_detection/get_anomaly_scores_query.sql | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql b/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql index 6efe84efb..e7320cf04 100644 --- a/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql +++ b/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql @@ -32,9 +32,11 @@ {%- set min_bucket_start_expr = elementary.get_trunc_min_bucket_start_expr(detection_end, metric_properties, test_configuration.days_back) %} {# Calculate detection period start for exclusion logic. + backfill_days defines the window of recent data to test for anomalies on each run. + It defaults to 2 days (configurable via vars.backfill_days or test-level parameter). The detection period spans from (detection_end - backfill_days) to detection_end. - This ensures we exclude the most recent backfill_days worth of data from training, - which are the metrics being actively tested for anomalies. #} + When exclude_detection_period_from_training is enabled, metrics in this detection period + are excluded from training statistics to prevent contamination from potentially anomalous data. #} {%- if test_configuration.exclude_detection_period_from_training %} {%- set detection_period_start = (detection_end - modules.datetime.timedelta(days=test_configuration.backfill_days)) %} {%- set detection_period_start_expr = elementary.edr_cast_as_timestamp(elementary.edr_datetime_to_sql(detection_period_start)) %} From e2a11d9bb0da7f64e71598b957efa911c047b9c8 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sun, 26 Oct 2025 15:19:26 +0000 Subject: [PATCH 13/18] Add Redshift-specific override for edr_multi_value_in to fix multi-column IN compatibility issue Co-Authored-By: Yosef Arbiv --- macros/utils/cross_db_utils/multi_value_in.sql | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/macros/utils/cross_db_utils/multi_value_in.sql b/macros/utils/cross_db_utils/multi_value_in.sql index 452147752..6b1f14d1d 100644 --- a/macros/utils/cross_db_utils/multi_value_in.sql +++ b/macros/utils/cross_db_utils/multi_value_in.sql @@ -32,3 +32,15 @@ from {{ target_table }} ) {%- endmacro -%} + +{%- macro redshift__edr_multi_value_in(source_cols, target_cols, target_table) -%} + exists ( + select 1 + from {{ target_table }} as _edr_mv_target + where + {%- for i in range(source_cols | length) -%} + {{ source_cols[i] }} = _edr_mv_target.{{ target_cols[i] }} + {%- if not loop.last %} and {% endif %} + {%- endfor %} + ) +{%- endmacro -%} From ecc6a80302a38f6767b726f7e1d3a5d717362bb1 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sun, 26 Oct 2025 15:58:33 +0000 Subject: [PATCH 14/18] Fix whitespace handling in redshift__edr_multi_value_in to prevent syntax error Co-Authored-By: Yosef Arbiv --- macros/utils/cross_db_utils/multi_value_in.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/macros/utils/cross_db_utils/multi_value_in.sql b/macros/utils/cross_db_utils/multi_value_in.sql index 6b1f14d1d..98c014f01 100644 --- a/macros/utils/cross_db_utils/multi_value_in.sql +++ b/macros/utils/cross_db_utils/multi_value_in.sql @@ -38,9 +38,9 @@ select 1 from {{ target_table }} as _edr_mv_target where - {%- for i in range(source_cols | length) -%} + {%- for i in range(source_cols | length) %} {{ source_cols[i] }} = _edr_mv_target.{{ target_cols[i] }} - {%- if not loop.last %} and {% endif %} + {%- if not loop.last %} and {% endif -%} {%- endfor %} ) {%- endmacro -%} From 9ddf0194a2d47aca7c67c238e91e7458aacae35f Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sun, 26 Oct 2025 18:16:35 +0000 Subject: [PATCH 15/18] Replace Redshift EXISTS with CONCAT approach to avoid correlated subquery error Co-Authored-By: Yosef Arbiv --- .../utils/cross_db_utils/multi_value_in.sql | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/macros/utils/cross_db_utils/multi_value_in.sql b/macros/utils/cross_db_utils/multi_value_in.sql index 98c014f01..5f79509f6 100644 --- a/macros/utils/cross_db_utils/multi_value_in.sql +++ b/macros/utils/cross_db_utils/multi_value_in.sql @@ -34,13 +34,17 @@ {%- endmacro -%} {%- macro redshift__edr_multi_value_in(source_cols, target_cols, target_table) -%} - exists ( - select 1 - from {{ target_table }} as _edr_mv_target - where - {%- for i in range(source_cols | length) %} - {{ source_cols[i] }} = _edr_mv_target.{{ target_cols[i] }} - {%- if not loop.last %} and {% endif -%} - {%- endfor %} + -- Redshift doesn't support multi-value IN, so we emulate it with CONCAT + concat( + {%- for val in source_cols -%} + {{ elementary.edr_cast_as_string(val) -}} + {%- if not loop.last %}, {% endif %} + {%- endfor %} + ) in ( + select concat({%- for val in target_cols -%} + {{ elementary.edr_cast_as_string(val) -}} + {%- if not loop.last %}, {% endif %} + {%- endfor %}) + from {{ target_table }} ) {%- endmacro -%} From e4f5c73ee942531a533a640cf803406d783dbf6a Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sun, 26 Oct 2025 18:58:18 +0000 Subject: [PATCH 16/18] Fix Redshift PARTITION BY error by excluding constant bucket_seasonality expression Co-Authored-By: Yosef Arbiv --- .../get_anomaly_scores_query.sql | 23 +++++++++++++++---- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql b/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql index e7320cf04..b72325564 100644 --- a/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql +++ b/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql @@ -17,16 +17,29 @@ {%- if test_configuration.seasonality == 'day_of_week' %} {%- set bucket_seasonality_expr = elementary.edr_day_of_week_expression('bucket_end') %} + {%- set has_seasonality = true %} {%- elif test_configuration.seasonality == 'hour_of_day' %} {%- set bucket_seasonality_expr = elementary.edr_hour_of_day_expression('bucket_end') %} + {%- set has_seasonality = true %} {%- elif test_configuration.seasonality == 'hour_of_week' %} {%- set bucket_seasonality_expr = elementary.edr_hour_of_week_expression('bucket_end') %} + {%- set has_seasonality = true %} {%- else %} {%- set bucket_seasonality_expr = elementary.const_as_text('no_seasonality') %} + {%- set has_seasonality = false %} {%- endif %} + + {# Build PARTITION BY clause for window functions. + Redshift doesn't support constant expressions in PARTITION BY, so we exclude + bucket_seasonality when it's a constant (i.e., when has_seasonality is false). #} + {%- set partition_by_keys = "metric_name, full_table_name, column_name, dimension, dimension_value" %} + {%- if has_seasonality %} + {%- set partition_by_keys = partition_by_keys ~ ", bucket_seasonality" %} + {%- endif %} + {%- set detection_end = elementary.get_detection_end(test_configuration.detection_delay) %} {%- set detection_end_expr = elementary.edr_cast_as_timestamp(elementary.edr_datetime_to_sql(detection_end)) %} {%- set min_bucket_start_expr = elementary.get_trunc_min_bucket_start_expr(detection_end, metric_properties, test_configuration.days_back) %} @@ -182,11 +195,11 @@ bucket_duration_hours, updated_at, should_exclude_from_training, - avg(case when not should_exclude_from_training then metric_value end) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_avg, - {{ elementary.standard_deviation('case when not should_exclude_from_training then metric_value end') }} over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_stddev, - count(case when not should_exclude_from_training then metric_value end) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_set_size, - last_value(case when not should_exclude_from_training then bucket_end end) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) training_end, - first_value(case when not should_exclude_from_training then bucket_end end) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_start + avg(case when not should_exclude_from_training then metric_value end) over (partition by {{ partition_by_keys }} order by bucket_end asc rows between unbounded preceding and current row) as training_avg, + {{ elementary.standard_deviation('case when not should_exclude_from_training then metric_value end') }} over (partition by {{ partition_by_keys }} order by bucket_end asc rows between unbounded preceding and current row) as training_stddev, + count(case when not should_exclude_from_training then metric_value end) over (partition by {{ partition_by_keys }} order by bucket_end asc rows between unbounded preceding and current row) as training_set_size, + last_value(case when not should_exclude_from_training then bucket_end end) over (partition by {{ partition_by_keys }} order by bucket_end asc rows between unbounded preceding and current row) training_end, + first_value(case when not should_exclude_from_training then bucket_end end) over (partition by {{ partition_by_keys }} order by bucket_end asc rows between unbounded preceding and current row) as training_start from grouped_metrics where not is_excluded {{ dbt_utils.group_by(14) }} From 842a255af008518a54e3852a902586512c3c3fc5 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 27 Oct 2025 05:55:34 +0000 Subject: [PATCH 17/18] Add explanatory comment to redshift__edr_multi_value_in macro Co-Authored-By: Yosef Arbiv --- macros/utils/cross_db_utils/multi_value_in.sql | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/macros/utils/cross_db_utils/multi_value_in.sql b/macros/utils/cross_db_utils/multi_value_in.sql index 5f79509f6..846fe681b 100644 --- a/macros/utils/cross_db_utils/multi_value_in.sql +++ b/macros/utils/cross_db_utils/multi_value_in.sql @@ -34,7 +34,14 @@ {%- endmacro -%} {%- macro redshift__edr_multi_value_in(source_cols, target_cols, target_table) -%} - -- Redshift doesn't support multi-value IN, so we emulate it with CONCAT + {# Redshift doesn't support multi-column IN subqueries (tuple IN) like: + (col1, col2) IN (SELECT col1, col2 FROM table) + + This limitation causes the error: "This type of IN/NOT IN query is not supported yet" + + To work around this, we use CONCAT to combine multiple columns into a single scalar + value on both sides of the IN comparison, similar to the BigQuery implementation. + This maintains the same semantics while avoiding Redshift's tuple IN limitation. #} concat( {%- for val in source_cols -%} {{ elementary.edr_cast_as_string(val) -}} From 961ad2a37aff9323f6a114084c3d8067c2595767 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 27 Oct 2025 10:14:53 +0000 Subject: [PATCH 18/18] Add explanatory comment for partition_by_keys dynamic PARTITION BY logic Co-Authored-By: Yosef Arbiv --- .../anomaly_detection/get_anomaly_scores_query.sql | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql b/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql index b72325564..3ff296f5f 100644 --- a/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql +++ b/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql @@ -32,9 +32,15 @@ {%- set has_seasonality = false %} {%- endif %} - {# Build PARTITION BY clause for window functions. - Redshift doesn't support constant expressions in PARTITION BY, so we exclude - bucket_seasonality when it's a constant (i.e., when has_seasonality is false). #} + {# Build PARTITION BY clause for window functions dynamically to work around Redshift limitation. + + Redshift doesn't allow constant expressions in PARTITION BY of window functions. When seasonality + is not configured, bucket_seasonality becomes a constant ('no_seasonality'::text), which triggers + the error "constant expressions are not supported in partition by clauses." + + We build the partition keys dynamically, always including the core metric keys and only appending + bucket_seasonality when it's computed from timestamps (has_seasonality = true). Partitioning by + a constant has no effect anyway, so this preserves behavior while keeping Redshift happy. #} {%- set partition_by_keys = "metric_name, full_table_name, column_name, dimension, dimension_value" %} {%- if has_seasonality %} {%- set partition_by_keys = partition_by_keys ~ ", bucket_seasonality" %}