Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions elementary/messages/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class Icon(Enum):
BELL = "bell"
GEM = "gem"
SPARKLES = "sparkles"
LINK = "link"


class TextStyle(Enum):
Expand Down
1 change: 1 addition & 0 deletions elementary/messages/formats/unicode.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
Icon.BELL: "🔔",
Icon.GEM: "💎",
Icon.SPARKLES: "✨",
Icon.LINK: "🔗",
}

for icon in Icon:
Expand Down
42 changes: 42 additions & 0 deletions elementary/monitor/alerts/alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ def __init__(
alert_fields: Optional[List[str]] = None,
elementary_database_and_schema: Optional[str] = None,
env: Optional[str] = None,
job_id: Optional[str] = None,
job_name: Optional[str] = None,
job_run_id: Optional[str] = None,
job_url: Optional[str] = None,
job_run_url: Optional[str] = None,
orchestrator: Optional[str] = None,
**kwargs,
):
self.id = id
Expand Down Expand Up @@ -65,6 +71,12 @@ def __init__(
self.alert_fields = alert_fields
self.elementary_database_and_schema = elementary_database_and_schema
self.env = env
self.job_id = job_id
self.job_name = job_name
self.job_run_id = job_run_id
self.job_url = job_url
self.job_run_url = job_run_url
self.orchestrator = orchestrator

@property
def unified_meta(self) -> Dict:
Expand All @@ -84,3 +96,33 @@ def summary(self) -> str:

def get_report_link(self) -> Optional[ReportLinkData]:
raise NotImplementedError

@property
def orchestrator_info(self) -> Optional[Dict[str, str]]:
"""Returns structured orchestrator metadata if available."""
if not any(
[
self.job_name,
self.job_run_id,
self.orchestrator,
self.job_url,
self.job_run_url,
]
):
return None

info = {}
if self.job_id:
info["job_id"] = self.job_id
if self.job_name:
info["job_name"] = self.job_name
if self.job_run_id:
info["run_id"] = self.job_run_id
if self.orchestrator:
info["orchestrator"] = self.orchestrator
if self.job_url:
info["job_url"] = self.job_url
if self.job_run_url:
info["run_url"] = self.job_run_url

return info
66 changes: 60 additions & 6 deletions elementary/monitor/alerts/alert_messages/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
from pydantic import BaseModel

from elementary.messages.block_builders import (
BoldTextBlock,
BoldTextLineBlock,
BulletListBlock,
FactsBlock,
ItalicTextLineBlock,
JsonCodeBlock,
LinkInlineBlocks,
LinksLineBlock,
MentionLineBlock,
NonPrimaryFactBlock,
Expand Down Expand Up @@ -42,6 +44,9 @@
from elementary.monitor.alerts.model_alert import ModelAlertModel
from elementary.monitor.alerts.source_freshness_alert import SourceFreshnessAlertModel
from elementary.monitor.alerts.test_alert import TestAlertModel
from elementary.monitor.data_monitoring.alerts.integrations.utils.orchestrator_link import (
create_orchestrator_link,
)
from elementary.monitor.data_monitoring.alerts.integrations.utils.report_link import (
ReportLinkData,
)
Expand Down Expand Up @@ -106,6 +111,7 @@ def _get_run_alert_subtitle_block(
suppression_interval: Optional[int] = None,
env: Optional[str] = None,
links: list[ReportLinkData] = [],
orchestrator_info: Optional[Dict[str, str]] = None,
) -> LinesBlock:
summary = []
summary.append((type.capitalize() + ":", name))
Expand All @@ -114,16 +120,62 @@ def _get_run_alert_subtitle_block(
summary.append(("Status:", status or "Unknown"))
if detected_at_str:
summary.append(("Time:", detected_at_str))

# Initialize subtitle lines with summary
subtitle_lines = []

if orchestrator_info and orchestrator_info.get("job_name"):
orchestrator_name = orchestrator_info.get("orchestrator", "orchestrator")
job_info_text = f"{orchestrator_info['job_name']} (via {orchestrator_name})"

# Create job info with inline orchestrator link
orchestrator_link = create_orchestrator_link(orchestrator_info)
if orchestrator_link:
# Create inline blocks for job info + link
job_inlines: List[InlineBlock] = [
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Minor: Use consistent type annotation style.

Line 135 uses List[InlineBlock] (uppercase) while line 113 uses list[ReportLinkData] (lowercase). For consistency and modern Python convention (3.9+), prefer lowercase list throughout.

Apply this diff:

-                job_inlines: List[InlineBlock] = [
+                job_inlines: list[InlineBlock] = [
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
job_inlines: List[InlineBlock] = [
job_inlines: list[InlineBlock] = [
🤖 Prompt for AI Agents
In elementary/monitor/alerts/alert_messages/builder.py around line 135, the type
annotation uses the legacy uppercase typing (List[InlineBlock]) which is
inconsistent with the modern lowercase style used elsewhere (e.g., line 113 uses
list[ReportLinkData]); change List[InlineBlock] to list[InlineBlock] and remove
any unnecessary import of List from typing if it becomes unused to keep
annotations consistent with Python 3.9+ conventions.

BoldTextBlock(text="Job:"),
TextBlock(text=job_info_text + " | "),
]
job_inlines.extend(
LinkInlineBlocks(
text=orchestrator_link.text,
url=orchestrator_link.url,
icon=orchestrator_link.icon,
)
)

# Add custom line with job info + link instead of summary item
subtitle_lines.append(LineBlock(inlines=job_inlines))
else:
summary.append(("Job:", job_info_text))
if suppression_interval:
summary.append(("Suppression interval:", str(suppression_interval)))
subtitle_lines = [SummaryLineBlock(summary=summary)]

if links:
subtitle_lines.append(
LinksLineBlock(
links=[(link.text, link.url, link.icon) for link in links]
# Add the main summary line
subtitle_lines.append(SummaryLineBlock(summary=summary))

# Combine regular links with orchestrator links
all_links = []

# Add existing report links
for link in links:
all_links.append((link.text, link.url, link.icon))

# Add orchestrator link if available (only if not already added inline)
if orchestrator_info and not orchestrator_info.get("job_name"):
orchestrator_link = create_orchestrator_link(orchestrator_info)
if orchestrator_link:
all_links.append(
(
orchestrator_link.text,
orchestrator_link.url,
orchestrator_link.icon,
)
)
)

if all_links:
subtitle_lines.append(LinksLineBlock(links=all_links))

return LinesBlock(lines=subtitle_lines)

def _get_run_alert_subtitle_links(
Expand Down Expand Up @@ -151,6 +203,7 @@ def _get_run_alert_subtitle_blocks(
asset_type = "snapshot" if alert.materialization == "snapshot" else "model"
asset_name = alert.alias
links = self._get_run_alert_subtitle_links(alert)
orchestrator_info = alert.orchestrator_info
return [
self._get_run_alert_subtitle_block(
type=asset_type,
Expand All @@ -160,6 +213,7 @@ def _get_run_alert_subtitle_blocks(
suppression_interval=alert.suppression_interval,
env=alert.env,
links=links,
orchestrator_info=orchestrator_info,
)
]

Expand Down
13 changes: 13 additions & 0 deletions elementary/monitor/alerts/model_alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ def __init__(
alert_fields: Optional[List[str]] = None,
elementary_database_and_schema: Optional[str] = None,
env: Optional[str] = None,
job_id: Optional[str] = None,
job_name: Optional[str] = None,
job_run_id: Optional[str] = None,
job_url: Optional[str] = None,
job_run_url: Optional[str] = None,
orchestrator: Optional[str] = None,
**kwargs,
):
super().__init__(
Expand All @@ -57,6 +63,13 @@ def __init__(
alert_fields,
elementary_database_and_schema,
env=env,
job_id=job_id,
job_name=job_name,
job_run_id=job_run_id,
job_url=job_url,
job_run_url=job_run_url,
orchestrator=orchestrator,
**kwargs,
)
self.alias = alias
self.path = path
Expand Down
13 changes: 13 additions & 0 deletions elementary/monitor/alerts/source_freshness_alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ def __init__(
alert_fields: Optional[List[str]] = None,
elementary_database_and_schema: Optional[str] = None,
env: Optional[str] = None,
job_id: Optional[str] = None,
job_name: Optional[str] = None,
job_run_id: Optional[str] = None,
job_url: Optional[str] = None,
job_run_url: Optional[str] = None,
orchestrator: Optional[str] = None,
**kwargs,
):
super().__init__(
Expand All @@ -66,6 +72,13 @@ def __init__(
alert_fields,
elementary_database_and_schema,
env=env,
job_id=job_id,
job_name=job_name,
job_run_id=job_run_id,
job_url=job_url,
job_run_url=job_run_url,
orchestrator=orchestrator,
**kwargs,
)
self.snapshotted_at_str = (
convert_datetime_utc_str_to_timezone_str(
Expand Down
13 changes: 13 additions & 0 deletions elementary/monitor/alerts/test_alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ def __init__(
alert_fields: Optional[List[str]] = None,
elementary_database_and_schema: Optional[str] = None,
env: Optional[str] = None,
job_id: Optional[str] = None,
job_name: Optional[str] = None,
job_run_id: Optional[str] = None,
job_url: Optional[str] = None,
job_run_url: Optional[str] = None,
orchestrator: Optional[str] = None,
**kwargs,
):
super().__init__(
Expand All @@ -68,6 +74,13 @@ def __init__(
alert_fields,
elementary_database_and_schema,
env=env,
job_id=job_id,
job_name=job_name,
job_run_id=job_run_id,
job_url=job_url,
job_run_url=job_run_url,
orchestrator=orchestrator,
**kwargs,
)
self.table_name = table_name
self.test_type = test_type
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from typing import Dict, Optional

from elementary.messages.blocks import Icon
from elementary.utils.pydantic_shim import BaseModel


class OrchestratorLinkData(BaseModel):
url: str
text: str
orchestrator: str
icon: Optional[Icon] = None


def create_orchestrator_link(
orchestrator_info: Dict[str, str]
) -> Optional[OrchestratorLinkData]:
"""Create an orchestrator link from orchestrator info if URL is available."""
if not orchestrator_info or not orchestrator_info.get("run_url"):
return None

orchestrator = orchestrator_info.get("orchestrator", "orchestrator")

return OrchestratorLinkData(
url=orchestrator_info["run_url"],
text=f"View in {orchestrator}",
orchestrator=orchestrator,
icon=Icon.LINK,
)


def create_job_link(
orchestrator_info: Dict[str, str]
) -> Optional[OrchestratorLinkData]:
"""Create a job-level orchestrator link if job URL is available."""
if not orchestrator_info or not orchestrator_info.get("job_url"):
return None

orchestrator = orchestrator_info.get("orchestrator", "orchestrator")
job_name = orchestrator_info.get("job_name", "Job")

# Capitalize orchestrator name for display
display_name = orchestrator.replace("_", " ").title()

return OrchestratorLinkData(
url=orchestrator_info["job_url"],
text=f"{job_name} in {display_name}",
orchestrator=orchestrator,
icon=Icon.GEAR,
)
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@
'owners': elementary.insensitive_get_dict_value(raw_model_alert, 'owners'),
'tags': elementary.insensitive_get_dict_value(raw_model_alert, 'tags'),
'model_meta': elementary.insensitive_get_dict_value(raw_model_alert, 'model_meta'),
'status': status
'status': status,
'job_id': elementary.insensitive_get_dict_value(raw_model_alert, 'job_id'),
'job_name': elementary.insensitive_get_dict_value(raw_model_alert, 'job_name'),
'job_run_id': elementary.insensitive_get_dict_value(raw_model_alert, 'job_run_id'),
'job_url': elementary.insensitive_get_dict_value(raw_model_alert, 'job_url'),
'job_run_url': elementary.insensitive_get_dict_value(raw_model_alert, 'job_run_url'),
'orchestrator': elementary.insensitive_get_dict_value(raw_model_alert, 'orchestrator')
} %}

{% set model_alert = elementary_cli.generate_alert_object(
Expand Down Expand Up @@ -54,6 +60,14 @@
select * from {{ ref('elementary', 'dbt_seeds') }}
),

dbt_invocations as (
select * from {{ ref('elementary', 'dbt_invocations') }}
),

dbt_run_results as (
select * from {{ ref('elementary', 'dbt_run_results') }}
),

artifacts_meta as (
select unique_id, meta from models
union all
Expand Down Expand Up @@ -106,16 +120,16 @@
all_alerts as (
select *
from all_run_results
where lower(status) != 'success'
and {{ elementary.edr_cast_as_timestamp('generated_at') }} > {{ elementary.edr_timeadd('day', -1 * days_back, elementary.edr_current_timestamp()) }}
where lower(all_run_results.status) != 'success'
and {{ elementary.edr_cast_as_timestamp('all_run_results.generated_at') }} > {{ elementary.edr_timeadd('day', -1 * days_back, elementary.edr_current_timestamp()) }}
)

select
all_alerts.alert_id,
all_alerts.unique_id,
{# Currently alert_class_id equals to unique_id - might change in the future so we return both #}
all_alerts.unique_id as alert_class_id,
{{ elementary.edr_cast_as_timestamp("generated_at") }} as detected_at,
{{ elementary.edr_cast_as_timestamp("all_alerts.generated_at") }} as detected_at,
{{ elementary.edr_current_timestamp() }} as created_at,
all_alerts.database_name,
all_alerts.materialization,
Expand All @@ -128,9 +142,17 @@
all_alerts.alias,
all_alerts.status,
all_alerts.full_refresh,
artifacts_meta.meta as model_meta
artifacts_meta.meta as model_meta,
invocations.job_id,
invocations.job_name,
invocations.job_run_id,
invocations.job_url,
invocations.job_run_url,
invocations.orchestrator
from all_alerts
left join artifacts_meta on all_alerts.unique_id = artifacts_meta.unique_id
left join dbt_run_results on all_alerts.alert_id = dbt_run_results.model_execution_id
left join dbt_invocations as invocations on dbt_run_results.invocation_id = invocations.invocation_id
where all_alerts.alert_id not in (
{# "this" is referring to "alerts_v2" - we are executing it using a post_hook over "alerts_v2" #}
select alert_id from {{ this }}
Expand Down
Loading
Loading