diff --git a/integration_tests/tests/test_anomaly_test_configuration.py b/integration_tests/tests/test_anomaly_test_configuration.py index 10cc72688..b91507fde 100644 --- a/integration_tests/tests/test_anomaly_test_configuration.py +++ b/integration_tests/tests/test_anomaly_test_configuration.py @@ -88,6 +88,7 @@ def get_value(key: str): "freshness_column": None, # Deprecated "dimensions": None, # should only be set at the test level, "exclude_final_results": get_value("exclude_final_results"), + "exclude_detection_period_from_training": None, } diff --git a/integration_tests/tests/test_volume_anomalies.py b/integration_tests/tests/test_volume_anomalies.py index 7f01091fe..10015d038 100644 --- a/integration_tests/tests/test_volume_anomalies.py +++ b/integration_tests/tests/test_volume_anomalies.py @@ -534,3 +534,88 @@ def test_anomalyless_vol_anomalies_with_test_materialization( test_vars={"enable_elementary_test_materialization": True}, ) assert test_result["status"] == "pass" + + +# Test for exclude_detection_period_from_training functionality +# This test demonstrates the use case where: +# 1. Detection period contains anomalous data that would normally be included in training +# 2. With exclude_detection_period_from_training=False: anomaly is missed (test passes) because training includes the anomaly +# 3. With exclude_detection_period_from_training=True: anomaly is detected (test fails) because training excludes the anomaly +@pytest.mark.skip_targets(["clickhouse"]) +def test_exclude_detection_from_training(test_id: str, dbt_project: DbtProject): + """ + Test the exclude_detection_period_from_training flag functionality. + + Scenario: + - 30 days of normal data with variance (98, 100, 102 rows per day pattern) + - 7 days of anomalous data (114 rows per day) in detection period + - Without exclusion: anomaly gets included in training baseline, test passes (misses anomaly) + - With exclusion: anomaly excluded from training, test fails (detects anomaly) + """ + utc_now = datetime.utcnow() + + # Generate 30 days of normal data with variance (98, 100, 102 pattern) + normal_pattern = [98, 100, 102] + normal_data = [] + for i in range(30): + date = utc_now - timedelta(days=37 - i) + rows_per_day = normal_pattern[i % 3] + normal_data.extend( + [ + {TIMESTAMP_COLUMN: date.strftime(DATE_FORMAT)} + for _ in range(rows_per_day) + ] + ) + + # Generate 7 days of anomalous data (114 rows per day) - this will be in detection period + anomalous_data = [] + for i in range(7): + date = utc_now - timedelta(days=7 - i) + anomalous_data.extend( + [ + {TIMESTAMP_COLUMN: date.strftime(DATE_FORMAT)} + for _ in range(114) # 14% increase from mean + ] + ) + + all_data = normal_data + anomalous_data + + # Test 1: WITHOUT exclusion (should pass - misses the anomaly because it's included in training) + test_args_without_exclusion = { + **DBT_TEST_ARGS, + "training_period": {"period": "day", "count": 30}, + "detection_period": {"period": "day", "count": 7}, + "time_bucket": {"period": "day", "count": 1}, + "sensitivity": 5, # Higher sensitivity to allow anomaly to be absorbed + # exclude_detection_period_from_training is not set (defaults to False/None) + } + + test_result_without_exclusion = dbt_project.test( + test_id + "_without_exclusion", + DBT_TEST_NAME, + test_args_without_exclusion, + data=all_data, + ) + + # This should PASS because the anomaly is included in training, making it part of the baseline + assert ( + test_result_without_exclusion["status"] == "pass" + ), "Test should pass when anomaly is included in training" + + # Test 2: WITH exclusion (should fail - detects the anomaly because it's excluded from training) + test_args_with_exclusion = { + **test_args_without_exclusion, + "exclude_detection_period_from_training": True, + } + + test_result_with_exclusion = dbt_project.test( + test_id + "_with_exclusion", + DBT_TEST_NAME, + test_args_with_exclusion, + data=all_data, + ) + + # This should FAIL because the anomaly is excluded from training, so it's detected as anomalous + assert ( + test_result_with_exclusion["status"] == "fail" + ), "Test should fail when anomaly is excluded from training" diff --git a/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql b/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql index 4819ae772..3ff296f5f 100644 --- a/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql +++ b/macros/edr/data_monitoring/anomaly_detection/get_anomaly_scores_query.sql @@ -17,20 +17,50 @@ {%- if test_configuration.seasonality == 'day_of_week' %} {%- set bucket_seasonality_expr = elementary.edr_day_of_week_expression('bucket_end') %} + {%- set has_seasonality = true %} {%- elif test_configuration.seasonality == 'hour_of_day' %} {%- set bucket_seasonality_expr = elementary.edr_hour_of_day_expression('bucket_end') %} + {%- set has_seasonality = true %} {%- elif test_configuration.seasonality == 'hour_of_week' %} {%- set bucket_seasonality_expr = elementary.edr_hour_of_week_expression('bucket_end') %} + {%- set has_seasonality = true %} {%- else %} {%- set bucket_seasonality_expr = elementary.const_as_text('no_seasonality') %} + {%- set has_seasonality = false %} {%- endif %} + + {# Build PARTITION BY clause for window functions dynamically to work around Redshift limitation. + + Redshift doesn't allow constant expressions in PARTITION BY of window functions. When seasonality + is not configured, bucket_seasonality becomes a constant ('no_seasonality'::text), which triggers + the error "constant expressions are not supported in partition by clauses." + + We build the partition keys dynamically, always including the core metric keys and only appending + bucket_seasonality when it's computed from timestamps (has_seasonality = true). Partitioning by + a constant has no effect anyway, so this preserves behavior while keeping Redshift happy. #} + {%- set partition_by_keys = "metric_name, full_table_name, column_name, dimension, dimension_value" %} + {%- if has_seasonality %} + {%- set partition_by_keys = partition_by_keys ~ ", bucket_seasonality" %} + {%- endif %} + {%- set detection_end = elementary.get_detection_end(test_configuration.detection_delay) %} {%- set detection_end_expr = elementary.edr_cast_as_timestamp(elementary.edr_datetime_to_sql(detection_end)) %} {%- set min_bucket_start_expr = elementary.get_trunc_min_bucket_start_expr(detection_end, metric_properties, test_configuration.days_back) %} + {# Calculate detection period start for exclusion logic. + backfill_days defines the window of recent data to test for anomalies on each run. + It defaults to 2 days (configurable via vars.backfill_days or test-level parameter). + The detection period spans from (detection_end - backfill_days) to detection_end. + When exclude_detection_period_from_training is enabled, metrics in this detection period + are excluded from training statistics to prevent contamination from potentially anomalous data. #} + {%- if test_configuration.exclude_detection_period_from_training %} + {%- set detection_period_start = (detection_end - modules.datetime.timedelta(days=test_configuration.backfill_days)) %} + {%- set detection_period_start_expr = elementary.edr_cast_as_timestamp(elementary.edr_datetime_to_sql(detection_period_start)) %} + {%- endif %} + {# For timestamped tests, this will be the bucket start, and for non-timestamped tests it will be the bucket end (which is the actual time of the test) #} {%- set metric_time_bucket_expr = 'case when bucket_start is not null then bucket_start else bucket_end end' %} @@ -142,6 +172,12 @@ bucket_end, {{ bucket_seasonality_expr }} as bucket_seasonality, {{ test_configuration.anomaly_exclude_metrics or 'FALSE' }} as is_excluded, + {# Flag detection period metrics for exclusion from training #} + {% if test_configuration.exclude_detection_period_from_training %} + bucket_end > {{ detection_period_start_expr }} + {% else %} + FALSE + {% endif %} as should_exclude_from_training, bucket_duration_hours, updated_at from grouped_metrics_duplicates @@ -164,14 +200,15 @@ bucket_seasonality, bucket_duration_hours, updated_at, - avg(metric_value) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_avg, - {{ elementary.standard_deviation('metric_value') }} over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_stddev, - count(metric_value) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_set_size, - last_value(bucket_end) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) training_end, - first_value(bucket_end) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_start + should_exclude_from_training, + avg(case when not should_exclude_from_training then metric_value end) over (partition by {{ partition_by_keys }} order by bucket_end asc rows between unbounded preceding and current row) as training_avg, + {{ elementary.standard_deviation('case when not should_exclude_from_training then metric_value end') }} over (partition by {{ partition_by_keys }} order by bucket_end asc rows between unbounded preceding and current row) as training_stddev, + count(case when not should_exclude_from_training then metric_value end) over (partition by {{ partition_by_keys }} order by bucket_end asc rows between unbounded preceding and current row) as training_set_size, + last_value(case when not should_exclude_from_training then bucket_end end) over (partition by {{ partition_by_keys }} order by bucket_end asc rows between unbounded preceding and current row) training_end, + first_value(case when not should_exclude_from_training then bucket_end end) over (partition by {{ partition_by_keys }} order by bucket_end asc rows between unbounded preceding and current row) as training_start from grouped_metrics where not is_excluded - {{ dbt_utils.group_by(13) }} + {{ dbt_utils.group_by(14) }} ), anomaly_scores as ( diff --git a/macros/edr/tests/test_configuration/get_anomalies_test_configuration.sql b/macros/edr/tests/test_configuration/get_anomalies_test_configuration.sql index fa9d061e6..41d338f62 100644 --- a/macros/edr/tests/test_configuration/get_anomalies_test_configuration.sql +++ b/macros/edr/tests/test_configuration/get_anomalies_test_configuration.sql @@ -23,7 +23,8 @@ anomaly_exclude_metrics, detection_period, training_period, - exclude_final_results) %} + exclude_final_results, + exclude_detection_period_from_training) %} {%- set model_graph_node = elementary.get_model_graph_node(model_relation) %} {# Changes in these configs impact the metric id of the test. #} @@ -53,6 +54,7 @@ {% set anomaly_exclude_metrics = elementary.get_test_argument('anomaly_exclude_metrics', anomaly_exclude_metrics, model_graph_node) %} {% set exclude_final_results = elementary.get_exclude_final_results(exclude_final_results) %} + {% set exclude_detection_period_from_training = elementary.get_test_argument('exclude_detection_period_from_training', exclude_detection_period_from_training, model_graph_node) %} {% set test_configuration = {'timestamp_column': metric_props.timestamp_column, @@ -71,7 +73,8 @@ 'fail_on_zero': fail_on_zero, 'detection_delay': detection_delay, 'anomaly_exclude_metrics': anomaly_exclude_metrics, - 'exclude_final_results': exclude_final_results + 'exclude_final_results': exclude_final_results, + 'exclude_detection_period_from_training': exclude_detection_period_from_training } %} {%- set test_configuration = elementary.undefined_dict_keys_to_none(test_configuration) -%} {%- do elementary.validate_mandatory_configuration(test_configuration, mandatory_params) -%} diff --git a/macros/edr/tests/test_table_anomalies.sql b/macros/edr/tests/test_table_anomalies.sql index 662bbc269..b49d450a8 100644 --- a/macros/edr/tests/test_table_anomalies.sql +++ b/macros/edr/tests/test_table_anomalies.sql @@ -1,4 +1,4 @@ -{% test table_anomalies(model, table_anomalies, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, mandatory_params=none, event_timestamp_column=none, freshness_column=none, sensitivity=none, ignore_small_changes={"spike_failure_percent_threshold": none, "drop_failure_percent_threshold": none}, fail_on_zero=false, detection_delay=none, anomaly_exclude_metrics=none, detection_period=none, training_period=none) %} +{% test table_anomalies(model, table_anomalies, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, mandatory_params=none, event_timestamp_column=none, freshness_column=none, sensitivity=none, ignore_small_changes={"spike_failure_percent_threshold": none, "drop_failure_percent_threshold": none}, fail_on_zero=false, detection_delay=none, anomaly_exclude_metrics=none, detection_period=none, training_period=none, exclude_detection_period_from_training=false) %} {{ config(tags = ['elementary-tests']) }} {%- if execute and elementary.is_test_command() and elementary.is_elementary_enabled() %} {% set model_relation = elementary.get_model_relation_for_test(model, elementary.get_test_model()) %} @@ -37,7 +37,8 @@ detection_delay=detection_delay, anomaly_exclude_metrics=anomaly_exclude_metrics, detection_period=detection_period, - training_period=training_period) %} + training_period=training_period, + exclude_detection_period_from_training=exclude_detection_period_from_training) %} {% if not test_configuration %} {{ exceptions.raise_compiler_error("Failed to create test configuration dict for test `{}`".format(test_table_name)) }} diff --git a/macros/edr/tests/test_volume_anomalies.sql b/macros/edr/tests/test_volume_anomalies.sql index 1c672522f..53e175d92 100644 --- a/macros/edr/tests/test_volume_anomalies.sql +++ b/macros/edr/tests/test_volume_anomalies.sql @@ -1,4 +1,4 @@ -{% test volume_anomalies(model, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, ignore_small_changes, fail_on_zero, detection_delay, anomaly_exclude_metrics, detection_period, training_period) %} +{% test volume_anomalies(model, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, ignore_small_changes, fail_on_zero, detection_delay, anomaly_exclude_metrics, detection_period, training_period, exclude_detection_period_from_training=false) %} {{ config(tags = ['elementary-tests']) }} {{ elementary.test_table_anomalies( @@ -20,7 +20,8 @@ detection_delay=detection_delay, anomaly_exclude_metrics=anomaly_exclude_metrics, detection_period=detection_period, - training_period=training_period + training_period=training_period, + exclude_detection_period_from_training=exclude_detection_period_from_training ) }} {% endtest %} diff --git a/macros/utils/cross_db_utils/multi_value_in.sql b/macros/utils/cross_db_utils/multi_value_in.sql index 452147752..846fe681b 100644 --- a/macros/utils/cross_db_utils/multi_value_in.sql +++ b/macros/utils/cross_db_utils/multi_value_in.sql @@ -32,3 +32,26 @@ from {{ target_table }} ) {%- endmacro -%} + +{%- macro redshift__edr_multi_value_in(source_cols, target_cols, target_table) -%} + {# Redshift doesn't support multi-column IN subqueries (tuple IN) like: + (col1, col2) IN (SELECT col1, col2 FROM table) + + This limitation causes the error: "This type of IN/NOT IN query is not supported yet" + + To work around this, we use CONCAT to combine multiple columns into a single scalar + value on both sides of the IN comparison, similar to the BigQuery implementation. + This maintains the same semantics while avoiding Redshift's tuple IN limitation. #} + concat( + {%- for val in source_cols -%} + {{ elementary.edr_cast_as_string(val) -}} + {%- if not loop.last %}, {% endif %} + {%- endfor %} + ) in ( + select concat({%- for val in target_cols -%} + {{ elementary.edr_cast_as_string(val) -}} + {%- if not loop.last %}, {% endif %} + {%- endfor %}) + from {{ target_table }} + ) +{%- endmacro -%}