diff --git a/macros/edr/tests/test_data_freshness_sla.sql b/macros/edr/tests/test_data_freshness_sla.sql new file mode 100644 index 000000000..e6b2eeac3 --- /dev/null +++ b/macros/edr/tests/test_data_freshness_sla.sql @@ -0,0 +1,243 @@ +{# + Test: data_freshness_sla + + Verifies that data in a model was updated before a specified SLA deadline time. + Checks the max timestamp value of a specified column in the data itself. + + Use case: "Is the data fresh?" / "Was the data updated on time?" + + Parameters: + timestamp_column (required): Column name containing timestamps to check for freshness + sla_time (required): Deadline time. Supports formats like "07:00", "7am", "2:30pm", "14:30" + timezone (required): IANA timezone name (e.g., "America/Los_Angeles", "Europe/London") + day_of_week (optional): Day(s) to check. String or list: "Monday", ["Monday", "Wednesday"] + day_of_month (optional): Day(s) of month to check. Integer or list: 1, [1, 15] + where_expression (optional): Additional WHERE clause filter for the data query + + Schedule behavior: + - If neither day_of_week nor day_of_month is set: check every day (default) + - If day_of_week is set: only check on those days + - If day_of_month is set: only check on those days + - If both are set: check if today matches EITHER filter (OR logic) + + Example usage: + models: + - name: my_model + tests: + - elementary.data_freshness_sla: + timestamp_column: updated_at + sla_time: "07:00" + timezone: "America/Los_Angeles" + + - name: daily_events + tests: + - elementary.data_freshness_sla: + timestamp_column: event_timestamp + sla_time: "6am" + timezone: "Europe/Amsterdam" + where_expression: "event_type = 'completed'" + + - name: weekly_report_data + tests: + - elementary.data_freshness_sla: + timestamp_column: report_date + sla_time: "09:00" + timezone: "Asia/Tokyo" + day_of_week: ["Monday"] + + Test passes if: + - Today is not a scheduled check day (based on day_of_week/day_of_month) + - OR the max timestamp in the data is from today (before or after deadline) + - OR the SLA deadline for today hasn't passed yet + + Test fails if: + - Today is a scheduled check day AND the deadline has passed AND: + - No data exists in the table + - The max timestamp is from a previous day (data not updated today) + + Important: + - The timestamp_column values are assumed to be in UTC (or timezone-naive timestamps + that represent UTC). If your data stores local timestamps, the comparison against + the SLA deadline (converted to UTC) will be incorrect. +#} + +{% test data_freshness_sla(model, timestamp_column, sla_time, timezone, day_of_week=none, day_of_month=none, where_expression=none) %} + {{ config(tags=['elementary-tests']) }} + + {%- if execute and elementary.is_test_command() and elementary.is_elementary_enabled() %} + + {# Validate required parameters #} + {% if not timestamp_column %} + {{ exceptions.raise_compiler_error("The 'timestamp_column' parameter is required. Example: timestamp_column: 'updated_at'") }} + {% endif %} + + {% if not sla_time %} + {{ exceptions.raise_compiler_error("The 'sla_time' parameter is required. Example: sla_time: '07:00'") }} + {% endif %} + + {# Validate timezone #} + {% do elementary.validate_timezone(timezone) %} + + {# Normalize and validate day filters #} + {% set day_of_week_filter = elementary.normalize_day_of_week(day_of_week) %} + {% set day_of_month_filter = elementary.normalize_day_of_month(day_of_month) %} + + {# Get model relation and validate #} + {% set model_relation = elementary.get_model_relation_for_test(model, elementary.get_test_model()) %} + {% if not model_relation %} + {{ exceptions.raise_compiler_error("Unsupported model: " ~ model ~ " (this might happen if you override 'ref' or 'source')") }} + {% endif %} + + {%- if elementary.is_ephemeral_model(model_relation) %} + {{ exceptions.raise_compiler_error("Test not supported for ephemeral models: " ~ model_relation.identifier) }} + {%- endif %} + + {# Validate timestamp column exists and is a timestamp type #} + {% set timestamp_column_data_type = elementary.find_normalized_data_type_for_column(model_relation, timestamp_column) %} + {% if not elementary.is_column_timestamp(model_relation, timestamp_column, timestamp_column_data_type) %} + {{ exceptions.raise_compiler_error("Column '" ~ timestamp_column ~ "' is not a timestamp type. The timestamp_column must be a timestamp or datetime column.") }} + {% endif %} + + {# Parse the SLA time #} + {% set parsed_time = elementary.parse_sla_time(sla_time) %} + {% set formatted_sla_time = elementary.format_sla_time(parsed_time) %} + + {# Calculate SLA deadline in UTC (also returns current day info) #} + {% set sla_info = elementary.calculate_sla_deadline_utc(parsed_time.hour, parsed_time.minute, timezone) %} + + {# Check if today is a scheduled check day #} + {% set should_check = elementary.should_check_sla_today( + sla_info.day_of_week, + sla_info.day_of_month, + day_of_week_filter, + day_of_month_filter + ) %} + + {# If today is not a scheduled check day, skip (pass) #} + {% if not should_check %} + {{ elementary.edr_log('Skipping data_freshness_sla test for ' ~ model_relation.identifier ~ ' - not a scheduled check day (' ~ sla_info.day_of_week ~ ', day ' ~ sla_info.day_of_month ~ ')') }} + {{ elementary.no_results_query() }} + {% else %} + + {{ elementary.edr_log('Running data_freshness_sla test for ' ~ model_relation.identifier ~ ' with SLA ' ~ formatted_sla_time ~ ' ' ~ timezone) }} + + {# Build the query #} + {{ elementary.get_data_freshness_sla_query( + model_relation=model_relation, + timestamp_column=timestamp_column, + sla_deadline_utc=sla_info.sla_deadline_utc, + target_date=sla_info.target_date, + target_date_start_utc=sla_info.target_date_start_utc, + target_date_end_utc=sla_info.target_date_end_utc, + deadline_passed=sla_info.deadline_passed, + formatted_sla_time=formatted_sla_time, + timezone=timezone, + where_expression=where_expression + ) }} + + {% endif %} + + {%- else %} + {{ elementary.no_results_query() }} + {%- endif %} + +{% endtest %} + + +{# + Build SQL query to check if data was updated before SLA deadline. + + Logic: + - Query the model table to get MAX(timestamp_column) + - Compare against today's date boundaries (computed in UTC at compile time) + - If max timestamp is from today or later: data is fresh, SLA met + - If deadline hasn't passed yet: Don't fail (still time) + - Otherwise: Data is stale, SLA missed +#} +{% macro get_data_freshness_sla_query(model_relation, timestamp_column, sla_deadline_utc, target_date, target_date_start_utc, target_date_end_utc, deadline_passed, formatted_sla_time, timezone, where_expression) %} + + with + + sla_deadline as ( + select + {{ elementary.edr_cast_as_timestamp("'" ~ sla_deadline_utc ~ "'") }} as deadline_utc, + {{ elementary.edr_cast_as_timestamp("'" ~ target_date_start_utc ~ "'") }} as target_date_start_utc, + {{ elementary.edr_cast_as_timestamp("'" ~ target_date_end_utc ~ "'") }} as target_date_end_utc, + '{{ target_date }}' as target_date + ), + + {# Get the max timestamp from the data #} + max_data_timestamp as ( + select + max({{ elementary.edr_cast_as_timestamp(timestamp_column) }}) as max_timestamp + from {{ model_relation }} + {% if where_expression %} + where {{ where_expression }} + {% endif %} + ), + + {# Determine freshness status #} + freshness_result as ( + select + sd.target_date, + sd.deadline_utc as sla_deadline_utc, + mdt.max_timestamp, + case + when mdt.max_timestamp is null then 'NO_DATA' + {# Data from today or future (e.g. pre-loaded records) counts as fresh #} + when mdt.max_timestamp >= sd.target_date_start_utc then 'DATA_FRESH' + else 'DATA_STALE' + end as freshness_status + from sla_deadline sd + cross join max_data_timestamp mdt + ), + + final_result as ( + select + '{{ model_relation.identifier }}' as model_name, + target_date, + '{{ formatted_sla_time }}' as sla_time, + '{{ timezone }}' as timezone, + cast(sla_deadline_utc as {{ elementary.edr_type_string() }}) as sla_deadline_utc, + freshness_status, + cast(max_timestamp as {{ elementary.edr_type_string() }}) as max_timestamp, + case + when freshness_status = 'DATA_FRESH' then false + {% if not deadline_passed %} + else false + {% else %} + else true + {% endif %} + end as is_failure, + case + when freshness_status = 'NO_DATA' then + 'No data found in "{{ model_relation.identifier }}"' || + {% if where_expression %} + ' (with filter: {{ where_expression | replace("'", "''") }})' || + {% endif %} + '. Expected data to be updated before {{ formatted_sla_time }} {{ timezone }}.' + when freshness_status = 'DATA_STALE' then + 'Data in "{{ model_relation.identifier }}" is stale. Last update was at ' || + cast(max_timestamp as {{ elementary.edr_type_string() }}) || + ', which is before today. Expected fresh data before {{ formatted_sla_time }} {{ timezone }}.' + else + 'Data in "{{ model_relation.identifier }}" is fresh - last update at ' || + cast(max_timestamp as {{ elementary.edr_type_string() }}) || + ' (before SLA deadline {{ formatted_sla_time }} {{ timezone }}).' + end as result_description + from freshness_result + ) + + select + model_name, + target_date, + sla_time, + timezone, + sla_deadline_utc, + freshness_status, + max_timestamp, + result_description + from final_result + where is_failure = true + +{% endmacro %} diff --git a/macros/edr/tests/test_volume_threshold.sql b/macros/edr/tests/test_volume_threshold.sql new file mode 100644 index 000000000..2138c9b1c --- /dev/null +++ b/macros/edr/tests/test_volume_threshold.sql @@ -0,0 +1,195 @@ +{# + Test: volume_threshold + + Monitors row count changes using percentage thresholds with multiple severity levels. + Uses Elementary's metric caching infrastructure to avoid recalculating row counts + for buckets that have already been computed. + + Parameters: + timestamp_column (required): Column to determine time periods + warn_threshold_percent (optional): % change that triggers warning (default: 5) + error_threshold_percent (optional): % change that triggers error (default: 10) + direction (optional): 'both', 'spike', or 'drop' (default: 'both') + time_bucket (optional): Time bucket config, e.g. {period: 'day', count: 1} + where_expression (optional): Additional WHERE filter + days_back (optional): Days of history to keep (default: 14) + backfill_days (optional): Days to backfill on each run (default: 2) + min_row_count (optional): Min baseline rows in previous bucket to trigger check (default: 100) + + Example: + - elementary.volume_threshold: + timestamp_column: created_at + warn_threshold_percent: 5 + error_threshold_percent: 10 + direction: both +#} + +{% test volume_threshold(model, + timestamp_column, + warn_threshold_percent=5, + error_threshold_percent=10, + direction='both', + time_bucket=none, + where_expression=none, + days_back=14, + backfill_days=2, + min_row_count=100) %} + {{ config(tags=['elementary-tests'], fail_calc='max(severity_level)', warn_if='>=1', error_if='>=2') }} + + {%- if execute and elementary.is_test_command() and elementary.is_elementary_enabled() %} + + {% if warn_threshold_percent > error_threshold_percent %} + {{ exceptions.raise_compiler_error("warn_threshold_percent cannot exceed error_threshold_percent") }} + {% endif %} + {% if direction not in ['both', 'spike', 'drop'] %} + {{ exceptions.raise_compiler_error("direction must be 'both', 'spike', or 'drop'") }} + {% endif %} + + {% set model_relation = elementary.get_model_relation_for_test(model, elementary.get_test_model()) %} + {% if not model_relation %} + {{ exceptions.raise_compiler_error("Unsupported model: " ~ model) }} + {% endif %} + + {%- if elementary.is_ephemeral_model(model_relation) %} + {{ exceptions.raise_compiler_error("Test not supported for ephemeral models: " ~ model_relation.identifier) }} + {%- endif %} + + {# Collect row_count metrics using Elementary's shared infrastructure. + This handles: incremental bucket detection, metric computation, temp table creation, cache storage. + Pass time_bucket as-is (none = use model/project default via get_time_bucket). #} + {% set table_metrics = [{"type": "row_count", "name": "row_count"}] %} + {% do elementary.collect_table_metrics( + table_metrics=table_metrics, + model_expr=model, + model_relation=model_relation, + timestamp_column=timestamp_column, + time_bucket=time_bucket, + days_back=days_back, + backfill_days=backfill_days, + where_expression=where_expression, + dimensions=[] + ) %} + + {# Build metric_properties to match the filter used by collect_table_metrics. + This must produce the same dict so our data_monitoring_metrics query matches. #} + {% set model_graph_node = elementary.get_model_graph_node(model_relation) %} + {% set metric_props = elementary.get_metric_properties(model_graph_node, timestamp_column, where_expression, time_bucket, []) %} + + {# Compare current vs previous bucket, combining cached history + newly computed metrics #} + {{ elementary.get_volume_threshold_comparison_query( + model_relation=model_relation, + metric_props=metric_props, + warn_threshold_percent=warn_threshold_percent, + error_threshold_percent=error_threshold_percent, + direction=direction, + min_row_count=min_row_count + ) }} + + {%- else %} + {{ elementary.no_results_query() }} + {%- endif %} +{% endtest %} + + +{% macro get_volume_threshold_comparison_query(model_relation, metric_props, warn_threshold_percent, + error_threshold_percent, direction, min_row_count) %} + + {% set data_monitoring_metrics_table = elementary.get_elementary_relation('data_monitoring_metrics') %} + {% set test_metrics_table = elementary.get_elementary_test_table(elementary.get_elementary_test_table_name(), 'metrics') %} + {% set full_table_name = elementary.relation_to_full_name(model_relation) %} + + with + + {# Union persisted history with newly computed metrics from this run #} + all_metrics as ( + select bucket_start, bucket_end, metric_value as row_count, 1 as source_priority + from {{ data_monitoring_metrics_table }} + where upper(full_table_name) = upper('{{ full_table_name }}') + and lower(metric_name) = 'row_count' + and metric_properties = {{ elementary.dict_to_quoted_json(metric_props) }} + + union all + + select bucket_start, bucket_end, metric_value as row_count, 0 as source_priority + from {{ test_metrics_table }} + ), + + {# Deduplicate: prefer freshly computed metrics (priority 0) over cached history (priority 1) #} + ranked_metrics as ( + select bucket_start, bucket_end, row_count, + row_number() over (partition by bucket_start, bucket_end order by source_priority asc) as rn + from all_metrics + ), + + metrics as ( + select bucket_start, bucket_end, row_count + from ranked_metrics + where rn = 1 + ), + + current_bucket as ( + select bucket_start, bucket_end, row_count + from metrics + order by bucket_end desc + limit 1 + ), + + previous_bucket as ( + select bucket_start, bucket_end, row_count + from metrics + where bucket_end <= (select bucket_start from current_bucket) + order by bucket_end desc + limit 1 + ), + + comparison as ( + select + curr.bucket_end as current_period, + prev.bucket_end as previous_period, + {{ elementary.edr_cast_as_int('curr.row_count') }} as current_row_count, + {{ elementary.edr_cast_as_int('prev.row_count') }} as previous_row_count, + case + when prev.row_count is null or prev.row_count = 0 then null + else round((curr.row_count - prev.row_count) * 100.0 / prev.row_count, 2) + end as percent_change + from current_bucket curr + left join previous_bucket prev on 1=1 + ), + + result as ( + select *, + case + when previous_row_count is null or previous_row_count < {{ min_row_count }} then 0 + when percent_change is null then 0 + {% if direction == 'both' %} + when abs(percent_change) >= {{ error_threshold_percent }} then 2 + when abs(percent_change) >= {{ warn_threshold_percent }} then 1 + {% elif direction == 'spike' %} + when percent_change >= {{ error_threshold_percent }} then 2 + when percent_change >= {{ warn_threshold_percent }} then 1 + {% else %} + when percent_change <= -{{ error_threshold_percent }} then 2 + when percent_change <= -{{ warn_threshold_percent }} then 1 + {% endif %} + else 0 + end as severity_level + from comparison + ) + + select + '{{ model_relation.identifier }}' as model_name, + cast(current_period as {{ elementary.edr_type_string() }}) as current_period, + cast(previous_period as {{ elementary.edr_type_string() }}) as previous_period, + current_row_count, + previous_row_count, + current_row_count - previous_row_count as absolute_change, + percent_change, + severity_level, + case severity_level when 2 then 'error' when 1 then 'warn' else 'pass' end as severity_name, + 'Row count changed by ' || cast(percent_change as {{ elementary.edr_type_string() }}) || + '% (from ' || cast(previous_row_count as {{ elementary.edr_type_string() }}) || + ' to ' || cast(current_row_count as {{ elementary.edr_type_string() }}) || ')' as result_description + from result + where severity_level > 0 + +{% endmacro %}