Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
350 changes: 350 additions & 0 deletions integration_tests/tests/test_dbt_artifacts/test_test_owners.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,350 @@
"""
Integration tests for test owner attribution in dbt_tests artifact table.
Tests that test ownership is correctly extracted from the primary model only,
not aggregated from all parent models (CORE-196).
"""
import contextlib
import json
import uuid
from pathlib import Path
from typing import Generator, List, Optional, Union

import pytest
from dbt_project import DbtProject


@contextlib.contextmanager
def cleanup_file(path: Path) -> Generator[None, None, None]:
"""Context manager to clean up a file after the test."""
try:
yield
finally:
if path.exists():
path.unlink()


def _parse_model_owners(
model_owners_value: Optional[Union[str, List[str]]]
) -> List[str]:
"""
Parse the model_owners column value which may be a JSON string or list.
Returns a list of owner strings.
"""
if model_owners_value is None:
return []
if isinstance(model_owners_value, list):
return model_owners_value
if isinstance(model_owners_value, str):
if not model_owners_value or model_owners_value == "[]":
return []
try:
parsed = json.loads(model_owners_value)
return parsed if isinstance(parsed, list) else [parsed]
except json.JSONDecodeError:
return [model_owners_value]
return []


def _create_model_sql(owner: Optional[str] = None, columns: str = "1 as id") -> str:
"""
Create a dbt model SQL string with optional owner configuration.

Args:
owner: The owner to set in the model's meta config. If None, no config is added.
columns: The SELECT clause columns. Defaults to "1 as id".

Returns:
A dbt model SQL string.
"""
if owner is not None:
return f"""
{{{{ config(meta={{'owner': '{owner}'}}) }}}}
select {columns}
"""
return f"""
select {columns}
"""


def test_single_parent_test_owner_attribution(dbt_project: DbtProject) -> None:
"""
Test that a test on a single model correctly inherits the owner from that model.
This is the baseline case - single parent tests should have the parent's owner.
"""
unique_id = str(uuid.uuid4()).replace("-", "_")
model_name = f"model_single_owner_{unique_id}"
owner_name = "Alice"

model_sql = _create_model_sql(owner=owner_name)

schema_yaml = {
"version": 2,
"models": [
{
"name": model_name,
"description": "A model with a single owner for testing",
"columns": [{"name": "id", "tests": ["unique"]}],
}
],
}

dbt_model_path = dbt_project.models_dir_path / "tmp" / f"{model_name}.sql"
with cleanup_file(dbt_model_path):
with dbt_project.write_yaml(
schema_yaml, name=f"schema_single_owner_{unique_id}.yml"
):
dbt_model_path.parent.mkdir(parents=True, exist_ok=True)
dbt_model_path.write_text(model_sql)

dbt_project.dbt_runner.vars["disable_dbt_artifacts_autoupload"] = False
dbt_project.dbt_runner.run(select=model_name)

tests = dbt_project.read_table(
"dbt_tests",
where=f"parent_model_unique_id LIKE '%{model_name}'",
raise_if_empty=True,
)

assert len(tests) == 1, f"Expected 1 test, got {len(tests)}"
test_row = tests[0]
model_owners = _parse_model_owners(test_row.get("model_owners"))

assert model_owners == [
owner_name
], f"Expected model_owners to be ['{owner_name}'], got {model_owners}"


@pytest.mark.skip_targets(["dremio"])
@pytest.mark.skip_for_dbt_fusion
def test_relationship_test_uses_primary_model_owner_only(
dbt_project: DbtProject,
) -> None:
"""
Test that a relationship test between two models with different owners
only uses the owner from the PRIMARY model (the one being tested),
not from the referenced model.

This is the key test for CORE-196 - previously owners were aggregated
from all parent models, now only the primary model's owner should be used.
"""
unique_id = str(uuid.uuid4()).replace("-", "_")
primary_model_name = f"model_primary_{unique_id}"
referenced_model_name = f"model_referenced_{unique_id}"
# Use explicit test name for reliable querying across dbt versions
test_name = f"rel_primary_owner_{unique_id}"
primary_owner = "Alice"
referenced_owner = "Bob"

primary_model_sql = _create_model_sql(
owner=primary_owner, columns="1 as id, 1 as ref_id"
)
referenced_model_sql = _create_model_sql(owner=referenced_owner)

schema_yaml = {
"version": 2,
"models": [
{
"name": primary_model_name,
"description": "Primary model with owner Alice",
"columns": [
{"name": "id"},
{
"name": "ref_id",
"tests": [
{
"relationships": {
"name": test_name,
"arguments": {
"to": f"ref('{referenced_model_name}')",
"field": "id",
},
}
}
],
},
],
},
{
"name": referenced_model_name,
"description": "Referenced model with owner Bob",
"columns": [{"name": "id"}],
},
],
}

primary_model_path = (
dbt_project.models_dir_path / "tmp" / f"{primary_model_name}.sql"
)
referenced_model_path = (
dbt_project.models_dir_path / "tmp" / f"{referenced_model_name}.sql"
)

with cleanup_file(primary_model_path), cleanup_file(referenced_model_path):
with dbt_project.write_yaml(
schema_yaml, name=f"schema_relationship_{unique_id}.yml"
):
primary_model_path.parent.mkdir(parents=True, exist_ok=True)
primary_model_path.write_text(primary_model_sql)
referenced_model_path.write_text(referenced_model_sql)

dbt_project.dbt_runner.vars["disable_dbt_artifacts_autoupload"] = False
dbt_project.dbt_runner.run(
select=f"{primary_model_name} {referenced_model_name}"
)

# Query by explicit test name - more robust across dbt versions
tests = dbt_project.read_table(
"dbt_tests",
where=f"name LIKE '%{test_name}%'",
raise_if_empty=False,
)

assert (
len(tests) == 1
), f"Expected 1 relationship test with name containing '{test_name}', got {len(tests)}. Tests found: {[t.get('name') for t in tests]}"
test_row = tests[0]
model_owners = _parse_model_owners(test_row.get("model_owners"))

assert model_owners == [
primary_owner
], f"Expected model_owners to be ['{primary_owner}'] (primary model only), got {model_owners}. Referenced model owner '{referenced_owner}' should NOT be included."


@pytest.mark.skip_targets(["dremio"])
@pytest.mark.skip_for_dbt_fusion
def test_relationship_test_no_owner_on_primary_model(dbt_project: DbtProject) -> None:
"""
Test that when the primary model has no owner but the referenced model does,
the test should have empty model_owners (not inherit from referenced model).
"""
unique_id = str(uuid.uuid4()).replace("-", "_")
primary_model_name = f"model_no_owner_{unique_id}"
referenced_model_name = f"model_with_owner_{unique_id}"
# Use explicit test name for reliable querying across dbt versions
test_name = f"rel_no_owner_{unique_id}"
referenced_owner = "Bob"

primary_model_sql = _create_model_sql(owner=None, columns="1 as id, 1 as ref_id")
referenced_model_sql = _create_model_sql(owner=referenced_owner)

schema_yaml = {
"version": 2,
"models": [
{
"name": primary_model_name,
"description": "Primary model with NO owner",
"columns": [
{"name": "id"},
{
"name": "ref_id",
"tests": [
{
"relationships": {
"name": test_name,
"arguments": {
"to": f"ref('{referenced_model_name}')",
"field": "id",
},
}
}
],
},
],
},
{
"name": referenced_model_name,
"description": "Referenced model with owner Bob",
"columns": [{"name": "id"}],
},
],
}

primary_model_path = (
dbt_project.models_dir_path / "tmp" / f"{primary_model_name}.sql"
)
referenced_model_path = (
dbt_project.models_dir_path / "tmp" / f"{referenced_model_name}.sql"
)

with cleanup_file(primary_model_path), cleanup_file(referenced_model_path):
with dbt_project.write_yaml(
schema_yaml, name=f"schema_no_owner_{unique_id}.yml"
):
primary_model_path.parent.mkdir(parents=True, exist_ok=True)
primary_model_path.write_text(primary_model_sql)
referenced_model_path.write_text(referenced_model_sql)

dbt_project.dbt_runner.vars["disable_dbt_artifacts_autoupload"] = False
dbt_project.dbt_runner.run(
select=f"{primary_model_name} {referenced_model_name}"
)

# Query by explicit test name - more robust across dbt versions
tests = dbt_project.read_table(
"dbt_tests",
where=f"name LIKE '%{test_name}%'",
raise_if_empty=False,
)

assert (
len(tests) == 1
), f"Expected 1 relationship test with name containing '{test_name}', got {len(tests)}. Tests found: {[t.get('name') for t in tests]}"
test_row = tests[0]
model_owners = _parse_model_owners(test_row.get("model_owners"))

assert (
model_owners == []
), f"Expected model_owners to be empty (primary model has no owner), got {model_owners}. Referenced model owner '{referenced_owner}' should NOT be inherited."


def test_owner_deduplication(dbt_project: DbtProject) -> None:
"""
Test that duplicate owners in a model's owner field are deduplicated.
For example, if owner is "Alice,Bob,Alice", the result should be ["Alice", "Bob"].
"""
unique_id = str(uuid.uuid4()).replace("-", "_")
model_name = f"model_dup_owner_{unique_id}"

model_sql = _create_model_sql(owner="Alice,Bob,Alice")

schema_yaml = {
"version": 2,
"models": [
{
"name": model_name,
"description": "A model with duplicate owners for testing deduplication",
"columns": [{"name": "id", "tests": ["unique"]}],
}
],
}

dbt_model_path = dbt_project.models_dir_path / "tmp" / f"{model_name}.sql"
with cleanup_file(dbt_model_path):
with dbt_project.write_yaml(
schema_yaml, name=f"schema_dup_owner_{unique_id}.yml"
):
dbt_model_path.parent.mkdir(parents=True, exist_ok=True)
dbt_model_path.write_text(model_sql)

dbt_project.dbt_runner.vars["disable_dbt_artifacts_autoupload"] = False
dbt_project.dbt_runner.run(select=model_name)

tests = dbt_project.read_table(
"dbt_tests",
where=f"parent_model_unique_id LIKE '%{model_name}'",
raise_if_empty=True,
)

assert len(tests) == 1, f"Expected 1 test, got {len(tests)}"
test_row = tests[0]
model_owners = _parse_model_owners(test_row.get("model_owners"))

assert (
len(model_owners) == 2
), f"Expected 2 unique owners, got {len(model_owners)}: {model_owners}"
assert (
"Alice" in model_owners
), f"Expected 'Alice' in model_owners, got {model_owners}"
assert (
"Bob" in model_owners
), f"Expected 'Bob' in model_owners, got {model_owners}"
25 changes: 13 additions & 12 deletions macros/edr/dbt_artifacts/upload_dbt_tests.sql
Original file line number Diff line number Diff line change
Expand Up @@ -83,23 +83,11 @@
{% set test_models_tags = [] %}
{% for test_model_node in test_model_nodes %}
{% set flatten_test_model_node = elementary.flatten_node(test_model_node) %}
{% set test_model_owner = flatten_test_model_node.get('owner') %}
{% if test_model_owner %}
{% if test_model_owner is string %}
{% set owners = test_model_owner.split(',') %}
{% for owner in owners %}
{% do test_models_owners.append(owner | trim) %}
{% endfor %}
{% elif test_model_owner is iterable %}
{% do test_models_owners.extend(test_model_owner) %}
{% endif %}
{% endif %}
{% set test_model_tags = flatten_test_model_node.get('tags') %}
{% if test_model_tags and test_model_tags is sequence %}
{% do test_models_tags.extend(test_model_tags) %}
{% endif %}
{% endfor %}
{% set test_models_owners = test_models_owners | unique | list %}
{% set test_models_tags = test_models_tags | unique | list %}

{% set test_kwargs = elementary.safe_get_with_default(test_metadata, 'kwargs', {}) %}
Expand Down Expand Up @@ -143,8 +131,21 @@
{% set primary_test_model_database = tested_model_node.get('database') %}
{% set primary_test_model_schema = tested_model_node.get('schema') %}
{% set group_name = group_name or tested_model_node.get('group') %}
{% set flatten_primary_model_node = elementary.flatten_node(tested_model_node) %}
{% set primary_model_owner = flatten_primary_model_node.get('owner') %}
{% if primary_model_owner %}
{% if primary_model_owner is string %}
{% set owners = primary_model_owner.split(',') %}
{% for owner in owners %}
{% do test_models_owners.append(owner | trim) %}
{% endfor %}
{% elif primary_model_owner is iterable %}
{% do test_models_owners.extend(primary_model_owner) %}
{% endif %}
{% endif %}
{%- endif -%}
{%- endif -%}
{% set test_models_owners = test_models_owners | unique | list %}

{%- if primary_test_model_database is none or primary_test_model_schema is none -%}
{# This is mainly here to support singular test cases with multiple referred models, in this case the tested node is being used to extract the db and schema #}
Expand Down
Loading