Skip to content
102 changes: 102 additions & 0 deletions integration_tests/tests/test_event_freshness_anomalies.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,105 @@ def test_slower_rate_event_freshness(test_id: str, dbt_project: DbtProject):
test_vars={"custom_run_started_at": test_started_at.isoformat()},
)
assert result["status"] == "fail"


# Anomalies currently not supported on ClickHouse
@pytest.mark.skip_targets(["clickhouse"])
def test_exclude_detection_from_training(test_id: str, dbt_project: DbtProject):
"""
Test the exclude_detection_period_from_training flag functionality for event freshness anomalies.

Scenario:
- 7 days of normal data (5 minute lag between event and update) - training period
- 7 days of anomalous data (5 hour lag) - detection period
- Without exclusion: anomaly gets included in training baseline, test passes (misses anomaly)
- With exclusion: anomaly excluded from training, test fails (detects anomaly)

"""
utc_now = datetime.utcnow()
test_started_at = (utc_now + timedelta(days=1)).replace(
hour=0, minute=0, second=0, microsecond=0
)

# Generate 7 days of normal data with varying lag (2-8 minutes) to ensure training_stddev > 0
training_lags_minutes = [2, 3, 4, 5, 6, 7, 8]
normal_data = []
for i in range(7):
event_date = test_started_at - timedelta(days=14 - i)
event_time = event_date.replace(hour=12, minute=0, second=0, microsecond=0)
update_time = event_time + timedelta(minutes=training_lags_minutes[i])
normal_data.append(
{
EVENT_TIMESTAMP_COLUMN: event_time.strftime(DATE_FORMAT),
UPDATE_TIMESTAMP_COLUMN: update_time.strftime(DATE_FORMAT),
}
)

# Generate 7 days of anomalous data with 5-hour lag (detection period)
anomalous_data = []
for i in range(7):
event_date = test_started_at - timedelta(days=7 - i)
event_time = event_date.replace(hour=12, minute=0, second=0, microsecond=0)
update_time = event_time + timedelta(hours=5)
anomalous_data.append(
{
EVENT_TIMESTAMP_COLUMN: event_time.strftime(DATE_FORMAT),
UPDATE_TIMESTAMP_COLUMN: update_time.strftime(DATE_FORMAT),
}
)

all_data = normal_data + anomalous_data

# Test 1: WITHOUT exclusion (should pass - misses the anomaly because it's included in training)
test_args_without_exclusion = {
"event_timestamp_column": EVENT_TIMESTAMP_COLUMN,
"update_timestamp_column": UPDATE_TIMESTAMP_COLUMN,
"days_back": 14, # Scoring window: 14 days to include both training and detection
"backfill_days": 7, # Detection period: last 7 days (days 7-1 before test_started_at)
"time_bucket": {
"period": "day",
"count": 1,
}, # Daily buckets to avoid boundary issues
"sensitivity": 3,
"anomaly_direction": "spike", # Explicit direction since we're testing increased lag
"min_training_set_size": 5, # Explicit minimum to avoid threshold issues
# exclude_detection_period_from_training is not set (defaults to False/None)
}

test_result_without_exclusion = dbt_project.test(
test_id + "_without_exclusion",
TEST_NAME,
test_args_without_exclusion,
data=all_data,
test_vars={
"custom_run_started_at": test_started_at.isoformat(),
"force_metrics_backfill": True,
},
)

# This should PASS because the anomaly is included in training, making it part of the baseline
assert (
test_result_without_exclusion["status"] == "pass"
), "Test should pass when anomaly is included in training"

# Test 2: WITH exclusion (should fail - detects the anomaly because it's excluded from training)
test_args_with_exclusion = {
**test_args_without_exclusion,
"exclude_detection_period_from_training": True,
}

test_result_with_exclusion = dbt_project.test(
test_id + "_with_exclusion",
TEST_NAME,
test_args_with_exclusion,
data=all_data,
test_vars={
"custom_run_started_at": test_started_at.isoformat(),
"force_metrics_backfill": True,
},
)

# This should FAIL because the anomaly is excluded from training, so it's detected as anomalous
assert (
test_result_with_exclusion["status"] == "fail"
), "Test should fail when anomaly is excluded from training"
5 changes: 3 additions & 2 deletions macros/edr/tests/test_event_freshness_anomalies.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% test event_freshness_anomalies(model, event_timestamp_column, update_timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, ignore_small_changes, detection_delay, anomaly_exclude_metrics, detection_period, training_period) %}
{% test event_freshness_anomalies(model, event_timestamp_column, update_timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, ignore_small_changes, detection_delay, anomaly_exclude_metrics, detection_period, training_period, exclude_detection_period_from_training=false) %}
{{ config(tags = ['elementary-tests']) }}
{% if execute and elementary.is_test_command() and elementary.is_elementary_enabled() %}
{% set model_relation = elementary.get_model_relation_for_test(model, elementary.get_test_model()) %}
Expand Down Expand Up @@ -32,7 +32,8 @@
detection_delay=detection_delay,
anomaly_exclude_metrics=anomaly_exclude_metrics,
detection_period=detection_period,
training_period=training_period
training_period=training_period,
exclude_detection_period_from_training=exclude_detection_period_from_training
)
}}
{% endtest %}
5 changes: 3 additions & 2 deletions macros/edr/tests/test_freshness_anomalies.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% test freshness_anomalies(model, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, ignore_small_changes, detection_delay, anomaly_exclude_metrics, detection_period, training_period) %}
{% test freshness_anomalies(model, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, ignore_small_changes, detection_delay, anomaly_exclude_metrics, detection_period, training_period, exclude_detection_period_from_training=false) %}
{{ config(tags = ['elementary-tests']) }}
{{ elementary.test_table_anomalies(
model=model,
Expand All @@ -18,7 +18,8 @@
detection_delay=detection_delay,
anomaly_exclude_metrics=anomaly_exclude_metrics,
detection_period=detection_period,
training_period=training_period
training_period=training_period,
exclude_detection_period_from_training=exclude_detection_period_from_training
)
}}
{% endtest %}