diff --git a/integration_tests/tests/test_event_freshness_anomalies.py b/integration_tests/tests/test_event_freshness_anomalies.py index 607c61271..c07f338e1 100644 --- a/integration_tests/tests/test_event_freshness_anomalies.py +++ b/integration_tests/tests/test_event_freshness_anomalies.py @@ -88,3 +88,105 @@ def test_slower_rate_event_freshness(test_id: str, dbt_project: DbtProject): test_vars={"custom_run_started_at": test_started_at.isoformat()}, ) assert result["status"] == "fail" + + +# Anomalies currently not supported on ClickHouse +@pytest.mark.skip_targets(["clickhouse"]) +def test_exclude_detection_from_training(test_id: str, dbt_project: DbtProject): + """ + Test the exclude_detection_period_from_training flag functionality for event freshness anomalies. + + Scenario: + - 7 days of normal data (5 minute lag between event and update) - training period + - 7 days of anomalous data (5 hour lag) - detection period + - Without exclusion: anomaly gets included in training baseline, test passes (misses anomaly) + - With exclusion: anomaly excluded from training, test fails (detects anomaly) + + """ + utc_now = datetime.utcnow() + test_started_at = (utc_now + timedelta(days=1)).replace( + hour=0, minute=0, second=0, microsecond=0 + ) + + # Generate 7 days of normal data with varying lag (2-8 minutes) to ensure training_stddev > 0 + training_lags_minutes = [2, 3, 4, 5, 6, 7, 8] + normal_data = [] + for i in range(7): + event_date = test_started_at - timedelta(days=14 - i) + event_time = event_date.replace(hour=12, minute=0, second=0, microsecond=0) + update_time = event_time + timedelta(minutes=training_lags_minutes[i]) + normal_data.append( + { + EVENT_TIMESTAMP_COLUMN: event_time.strftime(DATE_FORMAT), + UPDATE_TIMESTAMP_COLUMN: update_time.strftime(DATE_FORMAT), + } + ) + + # Generate 7 days of anomalous data with 5-hour lag (detection period) + anomalous_data = [] + for i in range(7): + event_date = test_started_at - timedelta(days=7 - i) + event_time = event_date.replace(hour=12, minute=0, second=0, microsecond=0) + update_time = event_time + timedelta(hours=5) + anomalous_data.append( + { + EVENT_TIMESTAMP_COLUMN: event_time.strftime(DATE_FORMAT), + UPDATE_TIMESTAMP_COLUMN: update_time.strftime(DATE_FORMAT), + } + ) + + all_data = normal_data + anomalous_data + + # Test 1: WITHOUT exclusion (should pass - misses the anomaly because it's included in training) + test_args_without_exclusion = { + "event_timestamp_column": EVENT_TIMESTAMP_COLUMN, + "update_timestamp_column": UPDATE_TIMESTAMP_COLUMN, + "days_back": 14, # Scoring window: 14 days to include both training and detection + "backfill_days": 7, # Detection period: last 7 days (days 7-1 before test_started_at) + "time_bucket": { + "period": "day", + "count": 1, + }, # Daily buckets to avoid boundary issues + "sensitivity": 3, + "anomaly_direction": "spike", # Explicit direction since we're testing increased lag + "min_training_set_size": 5, # Explicit minimum to avoid threshold issues + # exclude_detection_period_from_training is not set (defaults to False/None) + } + + test_result_without_exclusion = dbt_project.test( + test_id + "_without_exclusion", + TEST_NAME, + test_args_without_exclusion, + data=all_data, + test_vars={ + "custom_run_started_at": test_started_at.isoformat(), + "force_metrics_backfill": True, + }, + ) + + # This should PASS because the anomaly is included in training, making it part of the baseline + assert ( + test_result_without_exclusion["status"] == "pass" + ), "Test should pass when anomaly is included in training" + + # Test 2: WITH exclusion (should fail - detects the anomaly because it's excluded from training) + test_args_with_exclusion = { + **test_args_without_exclusion, + "exclude_detection_period_from_training": True, + } + + test_result_with_exclusion = dbt_project.test( + test_id + "_with_exclusion", + TEST_NAME, + test_args_with_exclusion, + data=all_data, + test_vars={ + "custom_run_started_at": test_started_at.isoformat(), + "force_metrics_backfill": True, + }, + ) + + # This should FAIL because the anomaly is excluded from training, so it's detected as anomalous + assert ( + test_result_with_exclusion["status"] == "fail" + ), "Test should fail when anomaly is excluded from training" diff --git a/macros/edr/tests/test_event_freshness_anomalies.sql b/macros/edr/tests/test_event_freshness_anomalies.sql index 0f6b1af05..e01017596 100644 --- a/macros/edr/tests/test_event_freshness_anomalies.sql +++ b/macros/edr/tests/test_event_freshness_anomalies.sql @@ -1,4 +1,4 @@ -{% test event_freshness_anomalies(model, event_timestamp_column, update_timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, ignore_small_changes, detection_delay, anomaly_exclude_metrics, detection_period, training_period) %} +{% test event_freshness_anomalies(model, event_timestamp_column, update_timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, ignore_small_changes, detection_delay, anomaly_exclude_metrics, detection_period, training_period, exclude_detection_period_from_training=false) %} {{ config(tags = ['elementary-tests']) }} {% if execute and elementary.is_test_command() and elementary.is_elementary_enabled() %} {% set model_relation = elementary.get_model_relation_for_test(model, elementary.get_test_model()) %} @@ -32,7 +32,8 @@ detection_delay=detection_delay, anomaly_exclude_metrics=anomaly_exclude_metrics, detection_period=detection_period, - training_period=training_period + training_period=training_period, + exclude_detection_period_from_training=exclude_detection_period_from_training ) }} {% endtest %} diff --git a/macros/edr/tests/test_freshness_anomalies.sql b/macros/edr/tests/test_freshness_anomalies.sql index abba9b4fc..c3a76af3e 100644 --- a/macros/edr/tests/test_freshness_anomalies.sql +++ b/macros/edr/tests/test_freshness_anomalies.sql @@ -1,4 +1,4 @@ -{% test freshness_anomalies(model, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, ignore_small_changes, detection_delay, anomaly_exclude_metrics, detection_period, training_period) %} +{% test freshness_anomalies(model, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, ignore_small_changes, detection_delay, anomaly_exclude_metrics, detection_period, training_period, exclude_detection_period_from_training=false) %} {{ config(tags = ['elementary-tests']) }} {{ elementary.test_table_anomalies( model=model, @@ -18,7 +18,8 @@ detection_delay=detection_delay, anomaly_exclude_metrics=anomaly_exclude_metrics, detection_period=detection_period, - training_period=training_period + training_period=training_period, + exclude_detection_period_from_training=exclude_detection_period_from_training ) }} {% endtest %}