diff --git a/integration_tests/tests/test_column_anomalies.py b/integration_tests/tests/test_column_anomalies.py index d4217009c..342f482f9 100644 --- a/integration_tests/tests/test_column_anomalies.py +++ b/integration_tests/tests/test_column_anomalies.py @@ -578,3 +578,112 @@ def test_col_anom_excl_detect_train(test_id: str, dbt_project: DbtProject): "Expected FAIL when exclude_detection_period_from_training=True " "(detection data excluded from training baseline, anomaly detected)" ) + + +@pytest.mark.skip_targets(["clickhouse", "redshift", "dremio"]) +def test_col_excl_detect_train_monthly(test_id: str, dbt_project: DbtProject): + """ + Test exclude_detection_period_from_training with monthly time buckets for column anomalies. + + This tests the fix where the detection period is set to the bucket size + when the bucket period exceeds backfill_days. With monthly buckets (30 days) + and default backfill_days (2), without the fix the 2-day exclusion window + cannot contain any monthly bucket_end, making exclusion ineffective. + + detection_period is intentionally NOT set so that backfill_days stays at + its default (2), which is smaller than the monthly bucket (30 days). + Setting detection_period would override backfill_days and mask the bug. + + Scenario: + - 12 months of normal data with low null count (~10 nulls/day, ~300/month) + - 1 month of anomalous data with high null count (~50 nulls/day, ~1500/month) + - time_bucket: month (30 days >> default backfill_days of 2) + - Without exclusion: anomaly absorbed into training → test passes + - With exclusion + fix: anomaly excluded from training → test fails + """ + utc_now = datetime.utcnow().date() + current_month_1st = utc_now.replace(day=1) + + anomaly_month_start = (current_month_1st - timedelta(days=1)).replace(day=1) + normal_month_start = anomaly_month_start.replace(year=anomaly_month_start.year - 1) + + normal_data: List[Dict[str, Any]] = [] + day = normal_month_start + day_idx = 0 + while day < anomaly_month_start: + null_count = 7 + (day_idx % 7) + normal_data.extend( + [ + {TIMESTAMP_COLUMN: day.strftime(DATE_FORMAT), "superhero": superhero} + for superhero in ["Superman", "Batman", "Wonder Woman", "Flash"] * 10 + ] + ) + normal_data.extend( + [ + {TIMESTAMP_COLUMN: day.strftime(DATE_FORMAT), "superhero": None} + for _ in range(null_count) + ] + ) + day += timedelta(days=1) + day_idx += 1 + + anomalous_data: List[Dict[str, Any]] = [] + day = anomaly_month_start + while day < utc_now: + anomalous_data.extend( + [ + {TIMESTAMP_COLUMN: day.strftime(DATE_FORMAT), "superhero": superhero} + for superhero in ["Superman", "Batman", "Wonder Woman", "Flash"] * 10 + ] + ) + anomalous_data.extend( + [ + {TIMESTAMP_COLUMN: day.strftime(DATE_FORMAT), "superhero": None} + for _ in range(50) + ] + ) + day += timedelta(days=1) + + all_data = normal_data + anomalous_data + + test_args_without_exclusion = { + "timestamp_column": TIMESTAMP_COLUMN, + "column_anomalies": ["null_count"], + "time_bucket": {"period": "month", "count": 1}, + "training_period": {"period": "day", "count": 365}, + "min_training_set_size": 5, + "anomaly_sensitivity": 10, + "anomaly_direction": "spike", + "exclude_detection_period_from_training": False, + } + + test_result_without = dbt_project.test( + test_id + "_f", + DBT_TEST_NAME, + test_args_without_exclusion, + data=all_data, + test_column="superhero", + test_vars={"force_metrics_backfill": True}, + ) + assert test_result_without["status"] == "pass", ( + "Expected PASS when exclude_detection_period_from_training=False " + "(detection data included in training baseline)" + ) + + test_args_with_exclusion = { + **test_args_without_exclusion, + "exclude_detection_period_from_training": True, + } + + test_result_with = dbt_project.test( + test_id + "_t", + DBT_TEST_NAME, + test_args_with_exclusion, + data=all_data, + test_column="superhero", + test_vars={"force_metrics_backfill": True}, + ) + assert test_result_with["status"] == "fail", ( + "Expected FAIL when exclude_detection_period_from_training=True " + "(large bucket fix: detection period set to bucket size)" + ) diff --git a/integration_tests/tests/test_volume_anomalies.py b/integration_tests/tests/test_volume_anomalies.py index 10015d038..87c789fbe 100644 --- a/integration_tests/tests/test_volume_anomalies.py +++ b/integration_tests/tests/test_volume_anomalies.py @@ -619,3 +619,88 @@ def test_exclude_detection_from_training(test_id: str, dbt_project: DbtProject): assert ( test_result_with_exclusion["status"] == "fail" ), "Test should fail when anomaly is excluded from training" + + +@pytest.mark.skip_targets(["clickhouse", "redshift", "dremio"]) +def test_excl_detect_train_monthly(test_id: str, dbt_project: DbtProject): + """ + Test exclude_detection_period_from_training with monthly time buckets. + + This tests the fix where the detection period is set to the bucket size + when the bucket period exceeds backfill_days. With monthly buckets (30 days) + and default backfill_days (2), without the fix the 2-day exclusion window + cannot contain any monthly bucket_end, making exclusion ineffective. + + detection_period is intentionally NOT set so that backfill_days stays at + its default (2), which is smaller than the monthly bucket (30 days). + Setting detection_period would override backfill_days and mask the bug. + + Scenario: + - 12 months of normal data (~20 rows/day, ~600/month) + - 1 month of anomalous data (~100 rows/day, ~3000/month) + - time_bucket: month (30 days >> default backfill_days of 2) + - Without exclusion: anomaly absorbed into training → test passes + - With exclusion + fix: anomaly excluded from training → test fails + """ + utc_now = datetime.utcnow() + current_month_1st = utc_now.replace( + day=1, hour=0, minute=0, second=0, microsecond=0 + ) + + anomaly_month_start = (current_month_1st - timedelta(days=1)).replace(day=1) + normal_month_start = anomaly_month_start.replace(year=anomaly_month_start.year - 1) + + normal_data = [] + day = normal_month_start + day_idx = 0 + while day < anomaly_month_start: + rows_per_day = 17 + (day_idx % 7) + normal_data.extend( + [{TIMESTAMP_COLUMN: day.strftime(DATE_FORMAT)} for _ in range(rows_per_day)] + ) + day += timedelta(days=1) + day_idx += 1 + + anomalous_data = [] + day = anomaly_month_start + while day < utc_now: + anomalous_data.extend( + [{TIMESTAMP_COLUMN: day.strftime(DATE_FORMAT)} for _ in range(100)] + ) + day += timedelta(days=1) + + all_data = normal_data + anomalous_data + + test_args_without_exclusion = { + **DBT_TEST_ARGS, + "training_period": {"period": "day", "count": 365}, + "time_bucket": {"period": "month", "count": 1}, + "sensitivity": 10, + } + + test_result_without = dbt_project.test( + test_id + "_without", + DBT_TEST_NAME, + test_args_without_exclusion, + data=all_data, + test_vars={"force_metrics_backfill": True}, + ) + assert ( + test_result_without["status"] == "pass" + ), "Test should pass when anomaly is included in training" + + test_args_with_exclusion = { + **test_args_without_exclusion, + "exclude_detection_period_from_training": True, + } + + test_result_with = dbt_project.test( + test_id + "_with", + DBT_TEST_NAME, + test_args_with_exclusion, + data=all_data, + test_vars={"force_metrics_backfill": True}, + ) + assert ( + test_result_with["status"] == "fail" + ), "Test should fail when anomaly is excluded from training (large bucket fix)" diff --git a/macros/edr/tests/test_configuration/get_anomalies_test_configuration.sql b/macros/edr/tests/test_configuration/get_anomalies_test_configuration.sql index bed9abd0a..06e20cc94 100644 --- a/macros/edr/tests/test_configuration/get_anomalies_test_configuration.sql +++ b/macros/edr/tests/test_configuration/get_anomalies_test_configuration.sql @@ -39,6 +39,13 @@ {%- set anomaly_direction = elementary.get_anomaly_direction(anomaly_direction, model_graph_node) %} {%- set detection_period = elementary.get_test_argument('detection_period', detection_period, model_graph_node) -%} {%- set backfill_days = elementary.detection_period_to_backfill_days(detection_period, backfill_days, model_graph_node) -%} + {%- if metric_props.time_bucket %} + {%- set bucket_in_days = elementary.convert_period(metric_props.time_bucket, 'day').count %} + {%- if bucket_in_days > backfill_days %} + {%- do elementary.edr_log("backfill_days increased from " ~ backfill_days ~ " to " ~ bucket_in_days ~ " to match time bucket size.") %} + {%- set backfill_days = bucket_in_days %} + {%- endif %} + {%- endif %} {%- set fail_on_zero = elementary.get_test_argument('fail_on_zero', fail_on_zero, model_graph_node) %}