diff --git a/elementary/messages/blocks.py b/elementary/messages/blocks.py index bc2160f2f..d74156bd6 100644 --- a/elementary/messages/blocks.py +++ b/elementary/messages/blocks.py @@ -20,6 +20,7 @@ class Icon(Enum): BELL = "bell" GEM = "gem" SPARKLES = "sparkles" + LINK = "link" class TextStyle(Enum): diff --git a/elementary/messages/formats/unicode.py b/elementary/messages/formats/unicode.py index 44107a026..316e0f67c 100644 --- a/elementary/messages/formats/unicode.py +++ b/elementary/messages/formats/unicode.py @@ -15,6 +15,7 @@ Icon.BELL: "🔔", Icon.GEM: "💎", Icon.SPARKLES: "✨", + Icon.LINK: "🔗", } for icon in Icon: diff --git a/elementary/monitor/alerts/alert.py b/elementary/monitor/alerts/alert.py index d96abbf59..e69be56f7 100644 --- a/elementary/monitor/alerts/alert.py +++ b/elementary/monitor/alerts/alert.py @@ -32,6 +32,12 @@ def __init__( alert_fields: Optional[List[str]] = None, elementary_database_and_schema: Optional[str] = None, env: Optional[str] = None, + job_id: Optional[str] = None, + job_name: Optional[str] = None, + job_run_id: Optional[str] = None, + job_url: Optional[str] = None, + job_run_url: Optional[str] = None, + orchestrator: Optional[str] = None, **kwargs, ): self.id = id @@ -65,6 +71,12 @@ def __init__( self.alert_fields = alert_fields self.elementary_database_and_schema = elementary_database_and_schema self.env = env + self.job_id = job_id + self.job_name = job_name + self.job_run_id = job_run_id + self.job_url = job_url + self.job_run_url = job_run_url + self.orchestrator = orchestrator @property def unified_meta(self) -> Dict: @@ -84,3 +96,33 @@ def summary(self) -> str: def get_report_link(self) -> Optional[ReportLinkData]: raise NotImplementedError + + @property + def orchestrator_info(self) -> Optional[Dict[str, str]]: + """Returns structured orchestrator metadata if available.""" + if not any( + [ + self.job_name, + self.job_run_id, + self.orchestrator, + self.job_url, + self.job_run_url, + ] + ): + return None + + info = {} + if self.job_id: + info["job_id"] = self.job_id + if self.job_name: + info["job_name"] = self.job_name + if self.job_run_id: + info["run_id"] = self.job_run_id + if self.orchestrator: + info["orchestrator"] = self.orchestrator + if self.job_url: + info["job_url"] = self.job_url + if self.job_run_url: + info["run_url"] = self.job_run_url + + return info diff --git a/elementary/monitor/alerts/alert_messages/builder.py b/elementary/monitor/alerts/alert_messages/builder.py index 102ab0fb7..d95862f36 100644 --- a/elementary/monitor/alerts/alert_messages/builder.py +++ b/elementary/monitor/alerts/alert_messages/builder.py @@ -4,11 +4,13 @@ from pydantic import BaseModel from elementary.messages.block_builders import ( + BoldTextBlock, BoldTextLineBlock, BulletListBlock, FactsBlock, ItalicTextLineBlock, JsonCodeBlock, + LinkInlineBlocks, LinksLineBlock, MentionLineBlock, NonPrimaryFactBlock, @@ -42,6 +44,9 @@ from elementary.monitor.alerts.model_alert import ModelAlertModel from elementary.monitor.alerts.source_freshness_alert import SourceFreshnessAlertModel from elementary.monitor.alerts.test_alert import TestAlertModel +from elementary.monitor.data_monitoring.alerts.integrations.utils.orchestrator_link import ( + create_orchestrator_link, +) from elementary.monitor.data_monitoring.alerts.integrations.utils.report_link import ( ReportLinkData, ) @@ -106,6 +111,7 @@ def _get_run_alert_subtitle_block( suppression_interval: Optional[int] = None, env: Optional[str] = None, links: list[ReportLinkData] = [], + orchestrator_info: Optional[Dict[str, str]] = None, ) -> LinesBlock: summary = [] summary.append((type.capitalize() + ":", name)) @@ -114,16 +120,62 @@ def _get_run_alert_subtitle_block( summary.append(("Status:", status or "Unknown")) if detected_at_str: summary.append(("Time:", detected_at_str)) + + # Initialize subtitle lines with summary + subtitle_lines = [] + + if orchestrator_info and orchestrator_info.get("job_name"): + orchestrator_name = orchestrator_info.get("orchestrator", "orchestrator") + job_info_text = f"{orchestrator_info['job_name']} (via {orchestrator_name})" + + # Create job info with inline orchestrator link + orchestrator_link = create_orchestrator_link(orchestrator_info) + if orchestrator_link: + # Create inline blocks for job info + link + job_inlines: List[InlineBlock] = [ + BoldTextBlock(text="Job:"), + TextBlock(text=job_info_text + " | "), + ] + job_inlines.extend( + LinkInlineBlocks( + text=orchestrator_link.text, + url=orchestrator_link.url, + icon=orchestrator_link.icon, + ) + ) + + # Add custom line with job info + link instead of summary item + subtitle_lines.append(LineBlock(inlines=job_inlines)) + else: + summary.append(("Job:", job_info_text)) if suppression_interval: summary.append(("Suppression interval:", str(suppression_interval))) - subtitle_lines = [SummaryLineBlock(summary=summary)] - if links: - subtitle_lines.append( - LinksLineBlock( - links=[(link.text, link.url, link.icon) for link in links] + # Add the main summary line + subtitle_lines.append(SummaryLineBlock(summary=summary)) + + # Combine regular links with orchestrator links + all_links = [] + + # Add existing report links + for link in links: + all_links.append((link.text, link.url, link.icon)) + + # Add orchestrator link if available (only if not already added inline) + if orchestrator_info and not orchestrator_info.get("job_name"): + orchestrator_link = create_orchestrator_link(orchestrator_info) + if orchestrator_link: + all_links.append( + ( + orchestrator_link.text, + orchestrator_link.url, + orchestrator_link.icon, + ) ) - ) + + if all_links: + subtitle_lines.append(LinksLineBlock(links=all_links)) + return LinesBlock(lines=subtitle_lines) def _get_run_alert_subtitle_links( @@ -151,6 +203,7 @@ def _get_run_alert_subtitle_blocks( asset_type = "snapshot" if alert.materialization == "snapshot" else "model" asset_name = alert.alias links = self._get_run_alert_subtitle_links(alert) + orchestrator_info = alert.orchestrator_info return [ self._get_run_alert_subtitle_block( type=asset_type, @@ -160,6 +213,7 @@ def _get_run_alert_subtitle_blocks( suppression_interval=alert.suppression_interval, env=alert.env, links=links, + orchestrator_info=orchestrator_info, ) ] diff --git a/elementary/monitor/alerts/model_alert.py b/elementary/monitor/alerts/model_alert.py index 10bb29027..a732e7726 100644 --- a/elementary/monitor/alerts/model_alert.py +++ b/elementary/monitor/alerts/model_alert.py @@ -37,6 +37,12 @@ def __init__( alert_fields: Optional[List[str]] = None, elementary_database_and_schema: Optional[str] = None, env: Optional[str] = None, + job_id: Optional[str] = None, + job_name: Optional[str] = None, + job_run_id: Optional[str] = None, + job_url: Optional[str] = None, + job_run_url: Optional[str] = None, + orchestrator: Optional[str] = None, **kwargs, ): super().__init__( @@ -57,6 +63,13 @@ def __init__( alert_fields, elementary_database_and_schema, env=env, + job_id=job_id, + job_name=job_name, + job_run_id=job_run_id, + job_url=job_url, + job_run_url=job_run_url, + orchestrator=orchestrator, + **kwargs, ) self.alias = alias self.path = path diff --git a/elementary/monitor/alerts/source_freshness_alert.py b/elementary/monitor/alerts/source_freshness_alert.py index 90bae3a50..d1561bb4f 100644 --- a/elementary/monitor/alerts/source_freshness_alert.py +++ b/elementary/monitor/alerts/source_freshness_alert.py @@ -46,6 +46,12 @@ def __init__( alert_fields: Optional[List[str]] = None, elementary_database_and_schema: Optional[str] = None, env: Optional[str] = None, + job_id: Optional[str] = None, + job_name: Optional[str] = None, + job_run_id: Optional[str] = None, + job_url: Optional[str] = None, + job_run_url: Optional[str] = None, + orchestrator: Optional[str] = None, **kwargs, ): super().__init__( @@ -66,6 +72,13 @@ def __init__( alert_fields, elementary_database_and_schema, env=env, + job_id=job_id, + job_name=job_name, + job_run_id=job_run_id, + job_url=job_url, + job_run_url=job_run_url, + orchestrator=orchestrator, + **kwargs, ) self.snapshotted_at_str = ( convert_datetime_utc_str_to_timezone_str( diff --git a/elementary/monitor/alerts/test_alert.py b/elementary/monitor/alerts/test_alert.py index 384f0c04c..f2514e31d 100644 --- a/elementary/monitor/alerts/test_alert.py +++ b/elementary/monitor/alerts/test_alert.py @@ -48,6 +48,12 @@ def __init__( alert_fields: Optional[List[str]] = None, elementary_database_and_schema: Optional[str] = None, env: Optional[str] = None, + job_id: Optional[str] = None, + job_name: Optional[str] = None, + job_run_id: Optional[str] = None, + job_url: Optional[str] = None, + job_run_url: Optional[str] = None, + orchestrator: Optional[str] = None, **kwargs, ): super().__init__( @@ -68,6 +74,13 @@ def __init__( alert_fields, elementary_database_and_schema, env=env, + job_id=job_id, + job_name=job_name, + job_run_id=job_run_id, + job_url=job_url, + job_run_url=job_run_url, + orchestrator=orchestrator, + **kwargs, ) self.table_name = table_name self.test_type = test_type diff --git a/elementary/monitor/data_monitoring/alerts/integrations/utils/orchestrator_link.py b/elementary/monitor/data_monitoring/alerts/integrations/utils/orchestrator_link.py new file mode 100644 index 000000000..35de53862 --- /dev/null +++ b/elementary/monitor/data_monitoring/alerts/integrations/utils/orchestrator_link.py @@ -0,0 +1,49 @@ +from typing import Dict, Optional + +from elementary.messages.blocks import Icon +from elementary.utils.pydantic_shim import BaseModel + + +class OrchestratorLinkData(BaseModel): + url: str + text: str + orchestrator: str + icon: Optional[Icon] = None + + +def create_orchestrator_link( + orchestrator_info: Dict[str, str] +) -> Optional[OrchestratorLinkData]: + """Create an orchestrator link from orchestrator info if URL is available.""" + if not orchestrator_info or not orchestrator_info.get("run_url"): + return None + + orchestrator = orchestrator_info.get("orchestrator", "orchestrator") + + return OrchestratorLinkData( + url=orchestrator_info["run_url"], + text=f"View in {orchestrator}", + orchestrator=orchestrator, + icon=Icon.LINK, + ) + + +def create_job_link( + orchestrator_info: Dict[str, str] +) -> Optional[OrchestratorLinkData]: + """Create a job-level orchestrator link if job URL is available.""" + if not orchestrator_info or not orchestrator_info.get("job_url"): + return None + + orchestrator = orchestrator_info.get("orchestrator", "orchestrator") + job_name = orchestrator_info.get("job_name", "Job") + + # Capitalize orchestrator name for display + display_name = orchestrator.replace("_", " ").title() + + return OrchestratorLinkData( + url=orchestrator_info["job_url"], + text=f"{job_name} in {display_name}", + orchestrator=orchestrator, + icon=Icon.GEAR, + ) diff --git a/elementary/monitor/dbt_project/macros/alerts/population/model_alerts.sql b/elementary/monitor/dbt_project/macros/alerts/population/model_alerts.sql index 65f049b85..882479710 100644 --- a/elementary/monitor/dbt_project/macros/alerts/population/model_alerts.sql +++ b/elementary/monitor/dbt_project/macros/alerts/population/model_alerts.sql @@ -21,7 +21,13 @@ 'owners': elementary.insensitive_get_dict_value(raw_model_alert, 'owners'), 'tags': elementary.insensitive_get_dict_value(raw_model_alert, 'tags'), 'model_meta': elementary.insensitive_get_dict_value(raw_model_alert, 'model_meta'), - 'status': status + 'status': status, + 'job_id': elementary.insensitive_get_dict_value(raw_model_alert, 'job_id'), + 'job_name': elementary.insensitive_get_dict_value(raw_model_alert, 'job_name'), + 'job_run_id': elementary.insensitive_get_dict_value(raw_model_alert, 'job_run_id'), + 'job_url': elementary.insensitive_get_dict_value(raw_model_alert, 'job_url'), + 'job_run_url': elementary.insensitive_get_dict_value(raw_model_alert, 'job_run_url'), + 'orchestrator': elementary.insensitive_get_dict_value(raw_model_alert, 'orchestrator') } %} {% set model_alert = elementary_cli.generate_alert_object( @@ -54,6 +60,14 @@ select * from {{ ref('elementary', 'dbt_seeds') }} ), + dbt_invocations as ( + select * from {{ ref('elementary', 'dbt_invocations') }} + ), + + dbt_run_results as ( + select * from {{ ref('elementary', 'dbt_run_results') }} + ), + artifacts_meta as ( select unique_id, meta from models union all @@ -106,8 +120,8 @@ all_alerts as ( select * from all_run_results - where lower(status) != 'success' - and {{ elementary.edr_cast_as_timestamp('generated_at') }} > {{ elementary.edr_timeadd('day', -1 * days_back, elementary.edr_current_timestamp()) }} + where lower(all_run_results.status) != 'success' + and {{ elementary.edr_cast_as_timestamp('all_run_results.generated_at') }} > {{ elementary.edr_timeadd('day', -1 * days_back, elementary.edr_current_timestamp()) }} ) select @@ -115,7 +129,7 @@ all_alerts.unique_id, {# Currently alert_class_id equals to unique_id - might change in the future so we return both #} all_alerts.unique_id as alert_class_id, - {{ elementary.edr_cast_as_timestamp("generated_at") }} as detected_at, + {{ elementary.edr_cast_as_timestamp("all_alerts.generated_at") }} as detected_at, {{ elementary.edr_current_timestamp() }} as created_at, all_alerts.database_name, all_alerts.materialization, @@ -128,9 +142,17 @@ all_alerts.alias, all_alerts.status, all_alerts.full_refresh, - artifacts_meta.meta as model_meta + artifacts_meta.meta as model_meta, + invocations.job_id, + invocations.job_name, + invocations.job_run_id, + invocations.job_url, + invocations.job_run_url, + invocations.orchestrator from all_alerts left join artifacts_meta on all_alerts.unique_id = artifacts_meta.unique_id + left join dbt_run_results on all_alerts.alert_id = dbt_run_results.model_execution_id + left join dbt_invocations as invocations on dbt_run_results.invocation_id = invocations.invocation_id where all_alerts.alert_id not in ( {# "this" is referring to "alerts_v2" - we are executing it using a post_hook over "alerts_v2" #} select alert_id from {{ this }} diff --git a/elementary/monitor/dbt_project/macros/alerts/population/source_freshness_alerts.sql b/elementary/monitor/dbt_project/macros/alerts/population/source_freshness_alerts.sql index b7561aa0b..8016d146a 100644 --- a/elementary/monitor/dbt_project/macros/alerts/population/source_freshness_alerts.sql +++ b/elementary/monitor/dbt_project/macros/alerts/population/source_freshness_alerts.sql @@ -33,7 +33,13 @@ 'error': raw_source_freshness_alert.get('error'), 'tags': raw_source_freshness_alert.get('tags'), 'model_meta': raw_source_freshness_alert.get('model_meta'), - 'freshness_description': raw_source_freshness_alert.get('freshness_description') + 'freshness_description': raw_source_freshness_alert.get('freshness_description'), + 'job_id': raw_source_freshness_alert.get('job_id'), + 'job_name': raw_source_freshness_alert.get('job_name'), + 'job_run_id': raw_source_freshness_alert.get('job_run_id'), + 'job_url': raw_source_freshness_alert.get('job_url'), + 'job_run_url': raw_source_freshness_alert.get('job_run_url'), + 'orchestrator': raw_source_freshness_alert.get('orchestrator') } %} {% set source_freshness_alert = elementary_cli.generate_alert_object( @@ -65,6 +71,14 @@ select * from {{ sources_relation }} ), + dbt_invocations as ( + select * from {{ ref('elementary', 'dbt_invocations') }} + ), + + dbt_run_results as ( + select * from {{ ref('elementary', 'dbt_run_results') }} + ), + source_freshness_alerts as ( select results.source_freshness_execution_id as alert_id, @@ -78,7 +92,7 @@ {{ elementary.edr_current_timestamp() }} as created_at, results.max_loaded_at_time_ago_in_s, results.status as original_status, - {{ elementary_cli.normalized_source_freshness_status()}}, + {{ elementary_cli.normalized_source_freshness_status('results.status')}}, {# backwards compatibility - these fields were added together #} {% if error_after_column_exists %} results.error_after, @@ -103,11 +117,19 @@ sources.meta as model_meta, sources.freshness_error_after, sources.freshness_warn_after, - sources.freshness_filter + sources.freshness_filter, + invocations.job_id, + invocations.job_name, + invocations.job_run_id, + invocations.job_url, + invocations.job_run_url, + invocations.orchestrator from dbt_source_freshness_results as results join dbt_sources as sources on results.unique_id = sources.unique_id - where lower(status) != 'pass' + left join dbt_run_results on results.source_freshness_execution_id = dbt_run_results.model_execution_id + left join dbt_invocations as invocations on dbt_run_results.invocation_id = invocations.invocation_id + where lower(results.status) != 'pass' and {{ elementary.edr_cast_as_timestamp('results.generated_at') }} > {{ elementary.edr_timeadd('day', -1 * days_back, elementary.edr_current_timestamp()) }} ) diff --git a/elementary/monitor/dbt_project/macros/alerts/population/test_alerts.sql b/elementary/monitor/dbt_project/macros/alerts/population/test_alerts.sql index 44e9c5fc6..8ebe9eb0e 100644 --- a/elementary/monitor/dbt_project/macros/alerts/population/test_alerts.sql +++ b/elementary/monitor/dbt_project/macros/alerts/population/test_alerts.sql @@ -39,7 +39,13 @@ 'test_meta': raw_test_alert.test_meta, 'model_meta': raw_test_alert.model_meta, 'status': status, - 'elementary_unique_id': raw_test_alert.elementary_unique_id + 'elementary_unique_id': raw_test_alert.elementary_unique_id, + 'job_id': raw_test_alert.job_id, + 'job_name': raw_test_alert.job_name, + 'job_run_id': raw_test_alert.job_run_id, + 'job_url': raw_test_alert.job_url, + 'job_run_url': raw_test_alert.job_run_url, + 'orchestrator': raw_test_alert.orchestrator } %} @@ -74,6 +80,14 @@ select * from {{ ref('elementary', 'dbt_tests') }} ), + dbt_invocations as ( + select * from {{ ref('elementary', 'dbt_invocations') }} + ), + + dbt_run_results as ( + select * from {{ ref('elementary', 'dbt_run_results') }} + ), + artifacts_meta as ( select unique_id, meta from models union all @@ -106,7 +120,7 @@ status, result_rows from elementary_test_results - where lower(status) != 'pass' + where lower(elementary_test_results.status) != 'pass' and {{ elementary.edr_cast_as_timestamp('detected_at') }} > {{ elementary.edr_timeadd('day', -1 * days_back, elementary.edr_current_timestamp()) }} ) @@ -145,10 +159,18 @@ failed_tests.result_rows, tests.meta as test_meta, tests.description as test_description, - artifacts_meta.meta as model_meta + artifacts_meta.meta as model_meta, + invocations.job_id, + invocations.job_name, + invocations.job_run_id, + invocations.job_url, + invocations.job_run_url, + invocations.orchestrator from failed_tests left join tests on failed_tests.test_unique_id = tests.unique_id left join artifacts_meta on failed_tests.model_unique_id = artifacts_meta.unique_id + left join dbt_run_results on failed_tests.test_execution_id = dbt_run_results.model_execution_id + left join dbt_invocations as invocations on dbt_run_results.invocation_id = invocations.invocation_id where failed_tests.alert_id not in ( {# "this" is referring to "alerts_v2" - we are executing it using a post_hook over "alerts_v2" #} select alert_id from {{ this }} diff --git a/elementary/monitor/dbt_project/macros/get_source_freshness_results.sql b/elementary/monitor/dbt_project/macros/get_source_freshness_results.sql index 147ef09f7..e6224f984 100644 --- a/elementary/monitor/dbt_project/macros/get_source_freshness_results.sql +++ b/elementary/monitor/dbt_project/macros/get_source_freshness_results.sql @@ -10,8 +10,8 @@ with dbt_source_freshness_results as ( select *, - {{ elementary_cli.normalized_source_freshness_status()}}, - row_number() over (partition by unique_id order by generated_at desc) as invocations_rank_index + {{ elementary_cli.normalized_source_freshness_status('status')}}, + rank() over (partition by unique_id order by generated_at desc) as invocations_rank_index from {{ ref('elementary', 'dbt_source_freshness_results') }} {% if days_back %} where {{ elementary.edr_datediff(elementary.edr_cast_as_timestamp('generated_at'), elementary.edr_current_timestamp(), 'day') }} < {{ days_back }} diff --git a/elementary/monitor/dbt_project/macros/utils/normalized_source_freshness_status.sql b/elementary/monitor/dbt_project/macros/utils/normalized_source_freshness_status.sql index d8ebab057..20c3e7fb5 100644 --- a/elementary/monitor/dbt_project/macros/utils/normalized_source_freshness_status.sql +++ b/elementary/monitor/dbt_project/macros/utils/normalized_source_freshness_status.sql @@ -1,7 +1,7 @@ -{% macro normalized_source_freshness_status() %} +{% macro normalized_source_freshness_status(status_column='status') %} case - when status = 'error' then 'fail' - when status = 'runtime error' then 'error' - else status + when {{ status_column }} = 'error' then 'fail' + when {{ status_column }} = 'runtime error' then 'error' + else {{ status_column }} end as normalized_status {% endmacro %} diff --git a/elementary/monitor/dbt_project/models/alerts/alerts_v2.sql b/elementary/monitor/dbt_project/models/alerts/alerts_v2.sql index 505ec08d6..860a1d332 100644 --- a/elementary/monitor/dbt_project/models/alerts/alerts_v2.sql +++ b/elementary/monitor/dbt_project/models/alerts/alerts_v2.sql @@ -24,6 +24,9 @@ -- depends_on: {{ ref('dbt_seeds') }} +-- depends_on: {{ ref('elementary', 'dbt_invocations') }} +-- depends_on: {{ ref('elementary', 'dbt_run_results') }} + -- backwards compatibility -- depends_on: {{ ref('elementary_cli', 'alerts') }} -- depends_on: {{ ref('elementary_cli', 'alerts_models') }} diff --git a/elementary/monitor/fetchers/alerts/schema/alert_data.py b/elementary/monitor/fetchers/alerts/schema/alert_data.py index f125b1249..eae594e1b 100644 --- a/elementary/monitor/fetchers/alerts/schema/alert_data.py +++ b/elementary/monitor/fetchers/alerts/schema/alert_data.py @@ -34,6 +34,13 @@ class BaseAlertDataSchema(BaseModel): owners: Optional[List[str]] = None model_meta: Optional[Dict] = None status: str + # Orchestrator fields + job_id: Optional[str] = None + job_name: Optional[str] = None + job_run_id: Optional[str] = None + job_url: Optional[str] = None + job_run_url: Optional[str] = None + orchestrator: Optional[str] = None @property def unified_meta(self) -> Dict: @@ -228,6 +235,13 @@ def format_alert( alert_fields=self.alert_fields, elementary_database_and_schema=elementary_database_and_schema, env=env, + # Orchestrator fields + job_id=self.job_id, + job_name=self.job_name, + job_run_id=self.job_run_id, + job_url=self.job_url, + job_run_url=self.job_run_url, + orchestrator=self.orchestrator, ) @@ -277,6 +291,13 @@ def format_alert( alert_fields=self.alert_fields, elementary_database_and_schema=elementary_database_and_schema, env=env, + # Orchestrator fields + job_id=self.job_id, + job_name=self.job_name, + job_run_id=self.job_run_id, + job_url=self.job_url, + job_run_url=self.job_run_url, + orchestrator=self.orchestrator, ) @validator("full_refresh", pre=True, always=True) @@ -346,4 +367,11 @@ def format_alert( alert_fields=self.alert_fields, elementary_database_and_schema=elementary_database_and_schema, env=env, + # Orchestrator fields + job_id=self.job_id, + job_name=self.job_name, + job_run_id=self.job_run_id, + job_url=self.job_url, + job_run_url=self.job_run_url, + orchestrator=self.orchestrator, ) diff --git a/tests/unit/alerts/alert_messages/test_orchestrator_message_simple.py b/tests/unit/alerts/alert_messages/test_orchestrator_message_simple.py new file mode 100644 index 000000000..ce2125bf2 --- /dev/null +++ b/tests/unit/alerts/alert_messages/test_orchestrator_message_simple.py @@ -0,0 +1,165 @@ +"""Simplified tests for orchestrator integration in alert messages.""" + +from unittest.mock import Mock + +from elementary.messages.formats.block_kit import format_block_kit +from elementary.monitor.alerts.model_alert import ModelAlertModel +from elementary.monitor.alerts.test_alert import TestAlertModel +from tests.unit.alerts.alert_messages.test_alert_utils import get_alert_message_body + + +class TestOrchestratorMessageIntegration: + """Test orchestrator integration in alert messages using actual message rendering.""" + + def test_alert_message_includes_orchestrator_job_info(self): + """Test that alerts with orchestrator data include job information.""" + alert = TestAlertModel( + id="test_id", + test_unique_id="test_unique_id", + elementary_unique_id="elementary_unique_id", + test_name="test_name", + severity="error", + test_type="dbt_test", + test_sub_type="generic", + test_short_name="test_short_name", + alert_class_id="test_alert_class_id", + status="fail", + job_name="nightly_load", + orchestrator="airflow", + ) + + # Mock get_report_link to avoid dependency + alert.get_report_link = Mock(return_value=None) + + # Generate message and render to Slack format + message_body = get_alert_message_body(alert) + slack_message = format_block_kit(message_body) + + # Convert to JSON string for easy text search + slack_json = str(slack_message) + + # Should contain job information + assert "nightly_load" in slack_json + assert "airflow" in slack_json + + def test_alert_message_includes_orchestrator_link(self): + """Test that alerts with run URL include orchestrator links.""" + alert = TestAlertModel( + id="test_id", + test_unique_id="test_unique_id", + elementary_unique_id="elementary_unique_id", + test_name="test_name", + severity="error", + test_type="dbt_test", + test_sub_type="generic", + test_short_name="test_short_name", + alert_class_id="test_alert_class_id", + status="fail", + job_name="nightly_load", + orchestrator="dbt_cloud", + job_run_url="https://cloud.getdbt.com/run/12345", + ) + + alert.get_report_link = Mock(return_value=None) + + message_body = get_alert_message_body(alert) + slack_message = format_block_kit(message_body) + slack_json = str(slack_message) + + # Should contain orchestrator link + assert "cloud.getdbt.com/run/12345" in slack_json + + def test_alert_without_orchestrator_data_works_normally(self): + """Test that alerts without orchestrator data work as before.""" + alert = TestAlertModel( + id="test_id", + test_unique_id="test_unique_id", + elementary_unique_id="elementary_unique_id", + test_name="test_name", + severity="error", + test_type="dbt_test", + test_sub_type="generic", + test_short_name="test_short_name", + alert_class_id="test_alert_class_id", + status="fail" + # No orchestrator fields + ) + + alert.get_report_link = Mock(return_value=None) + + # Should not fail and should generate a valid message + message_body = get_alert_message_body(alert) + slack_message = format_block_kit(message_body) + + # Should be a valid Slack message structure (it's a Pydantic model, not a dict) + assert slack_message is not None + assert hasattr(slack_message, "attachments") or hasattr(slack_message, "blocks") + + def test_model_alert_with_orchestrator_data(self): + """Test ModelAlertModel with orchestrator integration.""" + alert = ModelAlertModel( + id="model_alert_id", + alias="test_model", + path="/models/test_model.sql", + original_path="models/test_model.sql", + materialization="table", + full_refresh=False, + alert_class_id="model_alert_class", + status="error", + job_name="nightly_build", + orchestrator="github_actions", + job_run_url="https://github.com/org/repo/actions/runs/123", + ) + + alert.get_report_link = Mock(return_value=None) + + message_body = get_alert_message_body(alert) + slack_message = format_block_kit(message_body) + slack_json = str(slack_message) + + # Should contain model orchestrator info + assert "nightly_build" in slack_json + assert "github_actions" in slack_json + + def test_orchestrator_info_property_integration(self): + """Test that the orchestrator_info property works correctly.""" + alert = TestAlertModel( + id="test_id", + test_unique_id="test_unique_id", + elementary_unique_id="elementary_unique_id", + test_name="test_name", + severity="error", + test_type="dbt_test", + test_sub_type="generic", + test_short_name="test_short_name", + alert_class_id="test_alert_class_id", + job_name="integration_test", + job_run_id="run_123", + orchestrator="airflow", + job_url="https://airflow.example.com/job/integration_test", + job_run_url="https://airflow.example.com/run/123", + ) + + # Test orchestrator_info property + orchestrator_info = alert.orchestrator_info + + assert orchestrator_info is not None + assert orchestrator_info["job_name"] == "integration_test" + assert orchestrator_info["run_id"] == "run_123" + assert orchestrator_info["orchestrator"] == "airflow" + assert ( + orchestrator_info["job_url"] + == "https://airflow.example.com/job/integration_test" + ) + assert orchestrator_info["run_url"] == "https://airflow.example.com/run/123" + + # Test message includes this data + alert.get_report_link = Mock(return_value=None) + + message_body = get_alert_message_body(alert) + slack_message = format_block_kit(message_body) + slack_json = str(slack_message) + + assert "integration_test" in slack_json + assert "airflow" in slack_json + assert "airflow.example.com/run/123" in slack_json diff --git a/tests/unit/alerts/test_orchestrator_integration.py b/tests/unit/alerts/test_orchestrator_integration.py new file mode 100644 index 000000000..eeb74efa2 --- /dev/null +++ b/tests/unit/alerts/test_orchestrator_integration.py @@ -0,0 +1,298 @@ +import pytest + +from elementary.messages.blocks import Icon +from elementary.monitor.alerts.model_alert import ModelAlertModel +from elementary.monitor.alerts.source_freshness_alert import SourceFreshnessAlertModel +from elementary.monitor.alerts.test_alert import TestAlertModel +from elementary.monitor.data_monitoring.alerts.integrations.utils.orchestrator_link import ( + OrchestratorLinkData, + create_job_link, + create_orchestrator_link, +) + + +class TestOrchestratorLinkCreation: + """Test orchestrator link creation functionality.""" + + def test_create_orchestrator_link_with_valid_data(self): + orchestrator_info = { + "job_name": "nightly_load", + "run_id": "12345", + "orchestrator": "airflow", + "run_url": "https://airflow.example.com/run/12345", + } + + link = create_orchestrator_link(orchestrator_info) + + assert link is not None + assert link.url == "https://airflow.example.com/run/12345" + assert link.orchestrator == "airflow" + assert link.icon == Icon.LINK + + def test_create_orchestrator_link_without_url_returns_none(self): + orchestrator_info = { + "job_name": "nightly_load", + "orchestrator": "airflow" + # No run_url + } + + link = create_orchestrator_link(orchestrator_info) + assert link is None + + def test_create_orchestrator_link_with_empty_info_returns_none(self): + link = create_orchestrator_link({}) + assert link is None + + def test_create_job_link_with_valid_data(self): + orchestrator_info = { + "job_name": "nightly_load", + "orchestrator": "airflow", + "job_url": "https://airflow.example.com/job/nightly_load", + } + + link = create_job_link(orchestrator_info) + + assert link is not None + assert link.text == "nightly_load in Airflow" + assert link.url == "https://airflow.example.com/job/nightly_load" + assert link.orchestrator == "airflow" + assert link.icon == Icon.GEAR + + def test_create_job_link_without_url_returns_none(self): + orchestrator_info = { + "job_name": "nightly_load", + "orchestrator": "airflow" + # No job_url + } + + link = create_job_link(orchestrator_info) + assert link is None + + +class TestAlertModelOrchestratorInfo: + """Test AlertModel orchestrator_info property across all alert types.""" + + def test_test_alert_orchestrator_info_with_complete_data(self): + alert = TestAlertModel( + id="test_id", + test_unique_id="test_unique_id", + elementary_unique_id="elementary_unique_id", + test_name="test_name", + severity="error", + test_type="dbt_test", + test_sub_type="generic", + test_short_name="test_short_name", + alert_class_id="test_alert_class_id", + job_name="nightly_load", + job_run_id="12345", + orchestrator="airflow", + job_url="https://airflow.example.com/job/nightly_load", + job_run_url="https://airflow.example.com/run/12345", + ) + + info = alert.orchestrator_info + assert info is not None + assert info["job_name"] == "nightly_load" + assert info["run_id"] == "12345" + assert info["orchestrator"] == "airflow" + assert info["job_url"] == "https://airflow.example.com/job/nightly_load" + assert info["run_url"] == "https://airflow.example.com/run/12345" + + def test_test_alert_orchestrator_info_with_minimal_data(self): + alert = TestAlertModel( + id="test_id", + test_unique_id="test_unique_id", + elementary_unique_id="elementary_unique_id", + test_name="test_name", + severity="error", + test_type="dbt_test", + test_sub_type="generic", + test_short_name="test_short_name", + alert_class_id="test_alert_class_id", + job_name="test_job" + # Only job_name provided + ) + + info = alert.orchestrator_info + assert info is not None + assert info["job_name"] == "test_job" + assert len(info) == 1 + + def test_test_alert_orchestrator_info_with_no_data_returns_none(self): + alert = TestAlertModel( + id="test_id", + test_unique_id="test_unique_id", + elementary_unique_id="elementary_unique_id", + test_name="test_name", + severity="error", + test_type="dbt_test", + test_sub_type="generic", + test_short_name="test_short_name", + alert_class_id="test_alert_class_id" + # No orchestrator fields + ) + + info = alert.orchestrator_info + assert info is None + + def test_model_alert_orchestrator_info(self): + alert = ModelAlertModel( + id="model_alert_id", + alias="test_model", + path="/models/test_model.sql", + original_path="models/test_model.sql", + materialization="table", + full_refresh=False, + alert_class_id="model_alert_class", + job_name="nightly_build", + job_run_id="67890", + orchestrator="dbt_cloud", + job_run_url="https://cloud.getdbt.com/run/67890", + ) + + info = alert.orchestrator_info + assert info is not None + assert info["job_name"] == "nightly_build" + assert info["run_id"] == "67890" + assert info["orchestrator"] == "dbt_cloud" + assert info["run_url"] == "https://cloud.getdbt.com/run/67890" + + def test_source_freshness_alert_orchestrator_info(self): + alert = SourceFreshnessAlertModel( + id="source_alert_id", + source_name="test_source", + identifier="test_table", + original_status="error", + path="sources.yml", + error="Freshness check failed", + alert_class_id="source_alert_class", + source_freshness_execution_id="exec_123", + job_name="freshness_check", + orchestrator="airflow", + job_run_url="https://airflow.example.com/run/111", + ) + + info = alert.orchestrator_info + assert info is not None + assert info["job_name"] == "freshness_check" + assert info["orchestrator"] == "airflow" + assert info["run_url"] == "https://airflow.example.com/run/111" + + +class TestOrchestratorInfoEdgeCases: + """Test edge cases and error handling for orchestrator integration.""" + + def test_orchestrator_info_with_empty_strings(self): + alert = TestAlertModel( + id="test_id", + test_unique_id="test_unique_id", + elementary_unique_id="elementary_unique_id", + test_name="test_name", + severity="error", + test_type="dbt_test", + test_sub_type="generic", + test_short_name="test_short_name", + alert_class_id="test_alert_class_id", + job_name="", # Empty string + orchestrator="", # Empty string + job_run_url="", # Empty string + ) + + info = alert.orchestrator_info + assert info is None + + def test_orchestrator_info_filters_none_values(self): + alert = TestAlertModel( + id="test_id", + test_unique_id="test_unique_id", + elementary_unique_id="elementary_unique_id", + test_name="test_name", + severity="error", + test_type="dbt_test", + test_sub_type="generic", + test_short_name="test_short_name", + alert_class_id="test_alert_class_id", + job_name="valid_job", + orchestrator=None, # None value + job_run_url=None, # None value + ) + + info = alert.orchestrator_info + assert info is not None + assert "job_name" in info + assert "orchestrator" not in info + assert "run_url" not in info + assert len(info) == 1 + + def test_orchestrator_info_with_only_run_id(self): + alert = TestAlertModel( + id="test_id", + test_unique_id="test_unique_id", + elementary_unique_id="elementary_unique_id", + test_name="test_name", + severity="error", + test_type="dbt_test", + test_sub_type="generic", + test_short_name="test_short_name", + alert_class_id="test_alert_class_id", + job_run_id="12345", # Only run_id provided + ) + + info = alert.orchestrator_info + assert info is not None + assert info["run_id"] == "12345" + assert len(info) == 1 + + def test_orchestrator_info_with_only_orchestrator(self): + alert = TestAlertModel( + id="test_id", + test_unique_id="test_unique_id", + elementary_unique_id="elementary_unique_id", + test_name="test_name", + severity="error", + test_type="dbt_test", + test_sub_type="generic", + test_short_name="test_short_name", + alert_class_id="test_alert_class_id", + orchestrator="dbt_cloud", # Only orchestrator provided + ) + + info = alert.orchestrator_info + assert info is not None + assert info["orchestrator"] == "dbt_cloud" + assert len(info) == 1 + + +class TestOrchestratorLinkDataModel: + """Test OrchestratorLinkData model validation.""" + + def test_orchestrator_link_data_creation(self): + link = OrchestratorLinkData( + url="https://airflow.example.com/run/123", + text="View in airflow", + orchestrator="airflow", + icon=Icon.LINK, + ) + + assert link.url == "https://airflow.example.com/run/123" + assert link.text == "View in airflow" + assert link.orchestrator == "airflow" + assert link.icon == Icon.LINK + + def test_orchestrator_link_data_without_icon(self): + link = OrchestratorLinkData( + url="https://airflow.example.com/run/123", + text="View in airflow", + orchestrator="airflow" + # icon is optional + ) + + assert link.icon is None + + def test_orchestrator_link_data_required_fields(self): + # Should raise validation error if required fields are missing + with pytest.raises((TypeError, ValueError)): + OrchestratorLinkData( + # Missing required fields + icon=Icon.LINK + )