Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
53870ce
Add exclude_detection_period_from_training flag to volume_anomalies test
arbiv Oct 20, 2025
d3a8597
Fix test values for exclude_detection_period_from_training test
devin-ai-integration[bot] Oct 21, 2025
8a409d1
Add exclude_detection_period_from_training flag to freshness tests
devin-ai-integration[bot] Oct 21, 2025
50f9e61
Adjust test values to make anomalies more detectable
devin-ai-integration[bot] Oct 21, 2025
2805b51
Fix test failures: add exclude_detection_period_from_training to expe…
devin-ai-integration[bot] Oct 21, 2025
490de30
Fix exclude_detection_period_from_training implementation
devin-ai-integration[bot] Oct 21, 2025
642f015
Remove freshness anomalies support from this PR
devin-ai-integration[bot] Oct 21, 2025
3b9ae55
Remove freshness test functions from test files
devin-ai-integration[bot] Oct 21, 2025
40a5f36
Remove package-lock.yml from PR
devin-ai-integration[bot] Oct 22, 2025
8b5791d
Remove 'NEW:' references and clean up obvious comments
devin-ai-integration[bot] Oct 22, 2025
57e7b7c
Address PR review comments: rename column, add validation comment, se…
devin-ai-integration[bot] Oct 23, 2025
9b9f435
Improve comment: clarify backfill_days definition and configuration
devin-ai-integration[bot] Oct 26, 2025
e2a11d9
Add Redshift-specific override for edr_multi_value_in to fix multi-co…
devin-ai-integration[bot] Oct 26, 2025
ecc6a80
Fix whitespace handling in redshift__edr_multi_value_in to prevent sy…
devin-ai-integration[bot] Oct 26, 2025
9ddf019
Replace Redshift EXISTS with CONCAT approach to avoid correlated subq…
devin-ai-integration[bot] Oct 26, 2025
e4f5c73
Fix Redshift PARTITION BY error by excluding constant bucket_seasonal…
devin-ai-integration[bot] Oct 26, 2025
842a255
Add explanatory comment to redshift__edr_multi_value_in macro
devin-ai-integration[bot] Oct 27, 2025
961ad2a
Add explanatory comment for partition_by_keys dynamic PARTITION BY logic
devin-ai-integration[bot] Oct 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions integration_tests/tests/test_anomaly_test_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def get_value(key: str):
"freshness_column": None, # Deprecated
"dimensions": None, # should only be set at the test level,
"exclude_final_results": get_value("exclude_final_results"),
"exclude_detection_period_from_training": None,
}


Expand Down
85 changes: 85 additions & 0 deletions integration_tests/tests/test_volume_anomalies.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,3 +534,88 @@ def test_anomalyless_vol_anomalies_with_test_materialization(
test_vars={"enable_elementary_test_materialization": True},
)
assert test_result["status"] == "pass"


# Test for exclude_detection_period_from_training functionality
# This test demonstrates the use case where:
# 1. Detection period contains anomalous data that would normally be included in training
# 2. With exclude_detection_period_from_training=False: anomaly is missed (test passes) because training includes the anomaly
# 3. With exclude_detection_period_from_training=True: anomaly is detected (test fails) because training excludes the anomaly
@pytest.mark.skip_targets(["clickhouse"])
def test_exclude_detection_from_training(test_id: str, dbt_project: DbtProject):
"""
Test the exclude_detection_period_from_training flag functionality.

Scenario:
- 30 days of normal data with variance (98, 100, 102 rows per day pattern)
- 7 days of anomalous data (114 rows per day) in detection period
- Without exclusion: anomaly gets included in training baseline, test passes (misses anomaly)
- With exclusion: anomaly excluded from training, test fails (detects anomaly)
"""
utc_now = datetime.utcnow()

# Generate 30 days of normal data with variance (98, 100, 102 pattern)
normal_pattern = [98, 100, 102]
normal_data = []
for i in range(30):
date = utc_now - timedelta(days=37 - i)
rows_per_day = normal_pattern[i % 3]
normal_data.extend(
[
{TIMESTAMP_COLUMN: date.strftime(DATE_FORMAT)}
for _ in range(rows_per_day)
]
)

# Generate 7 days of anomalous data (114 rows per day) - this will be in detection period
anomalous_data = []
for i in range(7):
date = utc_now - timedelta(days=7 - i)
anomalous_data.extend(
[
{TIMESTAMP_COLUMN: date.strftime(DATE_FORMAT)}
for _ in range(114) # 14% increase from mean
]
)

all_data = normal_data + anomalous_data

# Test 1: WITHOUT exclusion (should pass - misses the anomaly because it's included in training)
test_args_without_exclusion = {
**DBT_TEST_ARGS,
"training_period": {"period": "day", "count": 30},
"detection_period": {"period": "day", "count": 7},
"time_bucket": {"period": "day", "count": 1},
"sensitivity": 5, # Higher sensitivity to allow anomaly to be absorbed
# exclude_detection_period_from_training is not set (defaults to False/None)
}

test_result_without_exclusion = dbt_project.test(
test_id + "_without_exclusion",
DBT_TEST_NAME,
test_args_without_exclusion,
data=all_data,
)

# This should PASS because the anomaly is included in training, making it part of the baseline
assert (
test_result_without_exclusion["status"] == "pass"
), "Test should pass when anomaly is included in training"

# Test 2: WITH exclusion (should fail - detects the anomaly because it's excluded from training)
test_args_with_exclusion = {
**test_args_without_exclusion,
"exclude_detection_period_from_training": True,
}

test_result_with_exclusion = dbt_project.test(
test_id + "_with_exclusion",
DBT_TEST_NAME,
test_args_with_exclusion,
data=all_data,
)

# This should FAIL because the anomaly is excluded from training, so it's detected as anomalous
assert (
test_result_with_exclusion["status"] == "fail"
), "Test should fail when anomaly is excluded from training"
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,50 @@

{%- if test_configuration.seasonality == 'day_of_week' %}
{%- set bucket_seasonality_expr = elementary.edr_day_of_week_expression('bucket_end') %}
{%- set has_seasonality = true %}

{%- elif test_configuration.seasonality == 'hour_of_day' %}
{%- set bucket_seasonality_expr = elementary.edr_hour_of_day_expression('bucket_end') %}
{%- set has_seasonality = true %}

{%- elif test_configuration.seasonality == 'hour_of_week' %}
{%- set bucket_seasonality_expr = elementary.edr_hour_of_week_expression('bucket_end') %}
{%- set has_seasonality = true %}

{%- else %}
{%- set bucket_seasonality_expr = elementary.const_as_text('no_seasonality') %}
{%- set has_seasonality = false %}
{%- endif %}

{# Build PARTITION BY clause for window functions dynamically to work around Redshift limitation.

Redshift doesn't allow constant expressions in PARTITION BY of window functions. When seasonality
is not configured, bucket_seasonality becomes a constant ('no_seasonality'::text), which triggers
the error "constant expressions are not supported in partition by clauses."

We build the partition keys dynamically, always including the core metric keys and only appending
bucket_seasonality when it's computed from timestamps (has_seasonality = true). Partitioning by
a constant has no effect anyway, so this preserves behavior while keeping Redshift happy. #}
{%- set partition_by_keys = "metric_name, full_table_name, column_name, dimension, dimension_value" %}
{%- if has_seasonality %}
{%- set partition_by_keys = partition_by_keys ~ ", bucket_seasonality" %}
{%- endif %}

{%- set detection_end = elementary.get_detection_end(test_configuration.detection_delay) %}
{%- set detection_end_expr = elementary.edr_cast_as_timestamp(elementary.edr_datetime_to_sql(detection_end)) %}
{%- set min_bucket_start_expr = elementary.get_trunc_min_bucket_start_expr(detection_end, metric_properties, test_configuration.days_back) %}

{# Calculate detection period start for exclusion logic.
backfill_days defines the window of recent data to test for anomalies on each run.
It defaults to 2 days (configurable via vars.backfill_days or test-level parameter).
The detection period spans from (detection_end - backfill_days) to detection_end.
When exclude_detection_period_from_training is enabled, metrics in this detection period
are excluded from training statistics to prevent contamination from potentially anomalous data. #}
{%- if test_configuration.exclude_detection_period_from_training %}
{%- set detection_period_start = (detection_end - modules.datetime.timedelta(days=test_configuration.backfill_days)) %}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validate this calculation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{%- set detection_period_start_expr = elementary.edr_cast_as_timestamp(elementary.edr_datetime_to_sql(detection_period_start)) %}
{%- endif %}

{# For timestamped tests, this will be the bucket start, and for non-timestamped tests it will be the
bucket end (which is the actual time of the test) #}
{%- set metric_time_bucket_expr = 'case when bucket_start is not null then bucket_start else bucket_end end' %}
Expand Down Expand Up @@ -142,6 +172,12 @@
bucket_end,
{{ bucket_seasonality_expr }} as bucket_seasonality,
{{ test_configuration.anomaly_exclude_metrics or 'FALSE' }} as is_excluded,
{# Flag detection period metrics for exclusion from training #}
{% if test_configuration.exclude_detection_period_from_training %}
bucket_end > {{ detection_period_start_expr }}
{% else %}
FALSE
{% endif %} as should_exclude_from_training,
bucket_duration_hours,
updated_at
from grouped_metrics_duplicates
Expand All @@ -164,14 +200,15 @@
bucket_seasonality,
bucket_duration_hours,
updated_at,
avg(metric_value) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_avg,
{{ elementary.standard_deviation('metric_value') }} over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_stddev,
count(metric_value) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_set_size,
last_value(bucket_end) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) training_end,
first_value(bucket_end) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_start
should_exclude_from_training,
avg(case when not should_exclude_from_training then metric_value end) over (partition by {{ partition_by_keys }} order by bucket_end asc rows between unbounded preceding and current row) as training_avg,
{{ elementary.standard_deviation('case when not should_exclude_from_training then metric_value end') }} over (partition by {{ partition_by_keys }} order by bucket_end asc rows between unbounded preceding and current row) as training_stddev,
count(case when not should_exclude_from_training then metric_value end) over (partition by {{ partition_by_keys }} order by bucket_end asc rows between unbounded preceding and current row) as training_set_size,
last_value(case when not should_exclude_from_training then bucket_end end) over (partition by {{ partition_by_keys }} order by bucket_end asc rows between unbounded preceding and current row) training_end,
first_value(case when not should_exclude_from_training then bucket_end end) over (partition by {{ partition_by_keys }} order by bucket_end asc rows between unbounded preceding and current row) as training_start
from grouped_metrics
where not is_excluded
{{ dbt_utils.group_by(13) }}
{{ dbt_utils.group_by(14) }}
),

anomaly_scores as (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
anomaly_exclude_metrics,
detection_period,
training_period,
exclude_final_results) %}
exclude_final_results,
exclude_detection_period_from_training) %}

{%- set model_graph_node = elementary.get_model_graph_node(model_relation) %}
{# Changes in these configs impact the metric id of the test. #}
Expand Down Expand Up @@ -53,6 +54,7 @@

{% set anomaly_exclude_metrics = elementary.get_test_argument('anomaly_exclude_metrics', anomaly_exclude_metrics, model_graph_node) %}
{% set exclude_final_results = elementary.get_exclude_final_results(exclude_final_results) %}
{% set exclude_detection_period_from_training = elementary.get_test_argument('exclude_detection_period_from_training', exclude_detection_period_from_training, model_graph_node) %}

{% set test_configuration =
{'timestamp_column': metric_props.timestamp_column,
Expand All @@ -71,7 +73,8 @@
'fail_on_zero': fail_on_zero,
'detection_delay': detection_delay,
'anomaly_exclude_metrics': anomaly_exclude_metrics,
'exclude_final_results': exclude_final_results
'exclude_final_results': exclude_final_results,
'exclude_detection_period_from_training': exclude_detection_period_from_training
} %}
{%- set test_configuration = elementary.undefined_dict_keys_to_none(test_configuration) -%}
{%- do elementary.validate_mandatory_configuration(test_configuration, mandatory_params) -%}
Expand Down
5 changes: 3 additions & 2 deletions macros/edr/tests/test_table_anomalies.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% test table_anomalies(model, table_anomalies, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, mandatory_params=none, event_timestamp_column=none, freshness_column=none, sensitivity=none, ignore_small_changes={"spike_failure_percent_threshold": none, "drop_failure_percent_threshold": none}, fail_on_zero=false, detection_delay=none, anomaly_exclude_metrics=none, detection_period=none, training_period=none) %}
{% test table_anomalies(model, table_anomalies, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, mandatory_params=none, event_timestamp_column=none, freshness_column=none, sensitivity=none, ignore_small_changes={"spike_failure_percent_threshold": none, "drop_failure_percent_threshold": none}, fail_on_zero=false, detection_delay=none, anomaly_exclude_metrics=none, detection_period=none, training_period=none, exclude_detection_period_from_training=false) %}
{{ config(tags = ['elementary-tests']) }}
{%- if execute and elementary.is_test_command() and elementary.is_elementary_enabled() %}
{% set model_relation = elementary.get_model_relation_for_test(model, elementary.get_test_model()) %}
Expand Down Expand Up @@ -37,7 +37,8 @@
detection_delay=detection_delay,
anomaly_exclude_metrics=anomaly_exclude_metrics,
detection_period=detection_period,
training_period=training_period) %}
training_period=training_period,
exclude_detection_period_from_training=exclude_detection_period_from_training) %}

{% if not test_configuration %}
{{ exceptions.raise_compiler_error("Failed to create test configuration dict for test `{}`".format(test_table_name)) }}
Expand Down
5 changes: 3 additions & 2 deletions macros/edr/tests/test_volume_anomalies.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% test volume_anomalies(model, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, ignore_small_changes, fail_on_zero, detection_delay, anomaly_exclude_metrics, detection_period, training_period) %}
{% test volume_anomalies(model, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, ignore_small_changes, fail_on_zero, detection_delay, anomaly_exclude_metrics, detection_period, training_period, exclude_detection_period_from_training=false) %}
{{ config(tags = ['elementary-tests']) }}

{{ elementary.test_table_anomalies(
Expand All @@ -20,7 +20,8 @@
detection_delay=detection_delay,
anomaly_exclude_metrics=anomaly_exclude_metrics,
detection_period=detection_period,
training_period=training_period
training_period=training_period,
exclude_detection_period_from_training=exclude_detection_period_from_training
)
}}
{% endtest %}
23 changes: 23 additions & 0 deletions macros/utils/cross_db_utils/multi_value_in.sql
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,26 @@
from {{ target_table }}
)
{%- endmacro -%}

{%- macro redshift__edr_multi_value_in(source_cols, target_cols, target_table) -%}
{# Redshift doesn't support multi-column IN subqueries (tuple IN) like:
(col1, col2) IN (SELECT col1, col2 FROM table)

This limitation causes the error: "This type of IN/NOT IN query is not supported yet"

To work around this, we use CONCAT to combine multiple columns into a single scalar
value on both sides of the IN comparison, similar to the BigQuery implementation.
This maintains the same semantics while avoiding Redshift's tuple IN limitation. #}
concat(
{%- for val in source_cols -%}
{{ elementary.edr_cast_as_string(val) -}}
{%- if not loop.last %}, {% endif %}
{%- endfor %}
) in (
select concat({%- for val in target_cols -%}
{{ elementary.edr_cast_as_string(val) -}}
{%- if not loop.last %}, {% endif %}
{%- endfor %})
from {{ target_table }}
)
{%- endmacro -%}
Loading