+
+```
+
+##Usage example
+
+Teams integration : Teams webhooks are deprecated. The only way to send alarts to Teams will soon be PowerAutomate workflows
+
+Let's create a Teams notification
+
+
+
+##Create a new Team
+
+Go to the Microsoft Teams desktop app and create a new team.
+
+
+
+Create a team from from a template and use the `From Scratch` template.
+
+
+
+Choose `Public` as the kind of a team.
+
+
+
+Call it `Elementary` (or whatever you prefer) and connect it to the workspace of your choice.
+
+
+
+
+
+
+
+## Create the workflow
+
+Create a new `instant cloud flow` and choose "When a HTTP request is received" as trigger
+
+
+
+Choose "Anyone" for the parameter "Who can trigger the flow" and paste this JSON schema
+
+```
+{
+ "type": "object",
+ "properties": {
+ "target_type": {
+ "type": "string"
+ },
+ "target_channel": {
+ "type": "string"
+ },
+ "target_recipients": {
+ "type": "string"
+ },
+ "adaptative_card": {
+ "type": "object"
+ }
+ }
+}
+```
+
+
+
+Add a new action "Post card in a chat or channel"
+
+
+
+Verify on the bottom that you are connected to your Teams and choose the destination of the notification, eg a channel
+
+Choose your team, then channel, and use the dynamic tool to select "adaptative_card" pour the so named field
+
+
+
+Note : you can use the field "target_type' et "target_channel" coming from the trigger to make the channel dynamic
+
+Save your instant flow, you can now copy the trigger public URL
+
+
+
+
+
+
+
+## Configure elementary alerts
+
+Create or update your config.yml elementary config file with theses parameters :
+
+```
+webhook:
+ http_content_type: application/json
+ template: teams-webhook-template.json
+ http_url:
+```
+
+That's all. Monitor action should now send alerts to the teams channel
+
+A single test failure alert :
+
+
+
+A grouped alert :
+
+
+
+
+
+## Custom template
+
+Stock templates reside in elementary/monitor/data_monitoring/alerts/integrations/webhook/templates directory
+
+You can copy anyone of the template in a local directory, adapt it and use it with any HTTP webhook system.
+
+If you need to embed HTML into a JSON payload, you can define the HTML in a HTML template the embed it into a JSON template
+
+The HTML template :
+
+```
+
+
+
+ {{ title }}
+
+
+```
+
+The JSON template :
+
+```
+{
+ "messages": [
+ {
+ "html": {{ included_html }},
+ "from": {
+ "email": "god@even.com"
+ },
+ "replyTo": {
+ "email": "god@even.com"
+ },
+ "subject": "{{ title }}",
+ "to": [
+ {% for recipient in recipients %}
+ {% if loop.index > 1 %},{% endif %}
+ {
+ "email": "{{ recipient }}"
+ }
+ {% endfor %}
+ ]
+ }
+ ]
+}
+```
+
+The config.yml file :
+
+```
+webhook:
+ http_content_type: application/json
+ template_dir: /home/workspace/templates
+ template: json-template.json
+ included_html_template: html-template.html
+ http_url: https://some_unauthenticated_private_api/v1/email/send
+
+```
+
+## Execute the CLI
+
+Make sure to run the following command after your dbt runs and tests:
+
+```
+edr monitor --group-by [table | alert]
+```
+---
+
+## Alert on source freshness failures
+
+_Not supported in dbt cloud_
+
+To alert on source freshness, you will need to run `edr run-operation upload-source-freshness` right after each execution of `dbt source freshness`.
+This operation will upload the results to a table, and the execution of `edr monitor` will send the actual alert.
+
+- Note that `dbt source freshness` and `upload-source-freshness` needs to run from the same machine.
+- Note that `upload-source-freshness` requires passing `--project-dir` argument.
+
+## Continuous alerting
+
+In order to monitor continuously, use your orchestrator to execute it regularly (we recommend running it right after
+your dbt job ends to monitor the latest data updates).
+
+Read more about how to deploy [Elementary in production](/oss/deployment-and-configuration/elementary-in-production).
+If you need help or wish to consult on this, reach out to us
+on [Slack](https://elementary-data.com/community).
diff --git a/elementary/config/config.py b/elementary/config/config.py
index 3eaefef46..d0232e7be 100644
--- a/elementary/config/config.py
+++ b/elementary/config/config.py
@@ -15,6 +15,7 @@
class Config:
_SLACK = "slack"
+ _WEBHOOK = "webhook"
_AWS = "aws"
_GOOGLE = "google"
_AZURE = "azure"
@@ -73,6 +74,23 @@ def __init__(
env: str = DEFAULT_ENV,
run_dbt_deps_if_needed: Optional[bool] = None,
project_name: Optional[str] = None,
+
+ webhook_template: Optional[str] = None,
+ webhook_template_dir: Optional[str] = None,
+ webhook_included_html_template: Optional[str] = None,
+ webhook_http_url: Optional[str] = None,
+ webhook_http_content_type: Optional[str] = None,
+ webhook_http_proxy: Optional[str] = None,
+ webhook_http_ssl_verify: Optional[str] = None,
+ webhook_http_auth_scheme: Optional[str] = None,
+ webhook_http_auth_basic_user: Optional[str] = None,
+ webhook_http_auth_basic_pass: Optional[str] = None,
+ webhook_http_auth_oauth2_url: Optional[str] = None,
+ webhook_http_auth_oauth2_client_id: Optional[str] = None,
+ webhook_http_auth_oauth2_secret: Optional[str] = None,
+ webhook_http_auth_oauth2_scope: Optional[str] = None,
+ webhook_target_type: Optional[str] = None,
+ webhook_target_channel: Optional[str] = None,
):
self.config_dir = config_dir
self.profiles_dir = profiles_dir
@@ -107,6 +125,73 @@ def __init__(
config.get("timezone"),
)
+ webhook_config = config.get(self._WEBHOOK, {})
+ self.webhook_template = self._first_not_none(
+ webhook_template,
+ webhook_config.get("template"),
+ )
+ self.webhook_template_dir = self._first_not_none(
+ webhook_template_dir,
+ webhook_config.get("template_dir"),
+ )
+ self.webhook_included_html_template = self._first_not_none(
+ webhook_included_html_template,
+ webhook_config.get("included_html_template"),
+ )
+ self.webhook_http_url = self._first_not_none(
+ webhook_http_url,
+ webhook_config.get("http_url"),
+ )
+ self.webhook_http_content_type = self._first_not_none(
+ webhook_http_content_type,
+ webhook_config.get("http_content_type"),
+ )
+ self.webhook_http_proxy = self._first_not_none(
+ webhook_http_proxy,
+ webhook_config.get("http_proxy"),
+ )
+ self.webhook_http_auth_scheme = self._first_not_none(
+ webhook_http_auth_scheme,
+ webhook_config.get("http_auth_scheme"),
+ )
+ self.webhook_http_ssl_verify = self._first_not_none(
+ webhook_http_ssl_verify,
+ webhook_config.get("http_ssl_verify"),
+ )
+ self.webhook_http_auth_basic_user = self._first_not_none(
+ webhook_http_auth_basic_user,
+ webhook_config.get("http_auth_basic_user"),
+ )
+ self.webhook_http_auth_basic_pass = self._first_not_none(
+ webhook_http_auth_basic_pass,
+ webhook_config.get("http_auth_basic_pass"),
+ )
+ self.webhook_http_auth_oauth2_url = self._first_not_none(
+ webhook_http_auth_oauth2_url,
+ webhook_config.get("http_auth_oauth2_url"),
+ )
+ self.webhook_http_auth_oauth2_client_id = self._first_not_none(
+ webhook_http_auth_oauth2_client_id,
+ webhook_config.get("http_auth_oauth2_client_id"),
+ )
+ self.webhook_http_auth_oauth2_secret = self._first_not_none(
+ webhook_http_auth_oauth2_secret,
+ webhook_config.get("http_auth_oauth2_secret"),
+ ),
+ self.webhook_http_auth_oauth2_scope = self._first_not_none(
+ webhook_http_auth_oauth2_scope,
+ webhook_config.get("http_auth_oauth2_scope"),
+ )
+ self.webhook_target_type = self._first_not_none(
+ webhook_target_type,
+ webhook_config.get("target_type"),
+ )
+ self.webhook_target_channel = self._first_not_none(
+ webhook_target_channel,
+ webhook_config.get("target_channel"),
+ )
+
+
slack_config = config.get(self._SLACK, {})
self.slack_webhook = self._first_not_none(
slack_webhook,
@@ -132,6 +217,7 @@ def __init__(
self.group_alerts_threshold = self._first_not_none(
group_alerts_threshold,
slack_config.get("group_alerts_threshold"),
+ webhook_config.get("group_alerts_threshold"),
self.DEFAULT_GROUP_ALERTS_THRESHOLD,
)
@@ -229,6 +315,10 @@ def has_slack(self) -> bool:
def has_teams(self) -> bool:
return self.teams_webhook
+ @property
+ def has_webhook(self) -> bool:
+ return self.webhook_template
+
@property
def has_s3(self):
return self.s3_bucket_name
@@ -259,17 +349,17 @@ def validate_monitor(self):
provided_integrations = list(
filter(
lambda provided_integration: provided_integration,
- [self.has_slack, self.has_teams],
+ [self.has_slack, self.has_teams, self.has_webhook],
)
)
self._validate_timezone()
if not provided_integrations:
raise InvalidArgumentsError(
- "Either a Slack token and a channel, a Slack webhook or a Microsoft Teams webhook is required."
+ "Either a Slack token and a channel, or a webhook is required."
)
if len(provided_integrations) > 1:
raise InvalidArgumentsError(
- "You provided both a Slack and Teams integration. Please provide only one so we know where to send the alerts."
+ "You provided several integrations. Please provide only one so we know where to send the alerts."
)
def validate_send_report(self):
diff --git a/elementary/monitor/cli.py b/elementary/monitor/cli.py
index 019293f1f..ff6023063 100644
--- a/elementary/monitor/cli.py
+++ b/elementary/monitor/cli.py
@@ -203,7 +203,7 @@ def get_cli_properties() -> dict:
@click.option(
"--group-alerts-threshold",
type=int,
- default=Config.DEFAULT_GROUP_ALERTS_THRESHOLD,
+ default=None,
help="The threshold for all alerts in a single message.",
)
@click.option(
diff --git a/elementary/monitor/data_monitoring/alerts/integrations/integrations.py b/elementary/monitor/data_monitoring/alerts/integrations/integrations.py
index 89b0519b2..60e34d97f 100644
--- a/elementary/monitor/data_monitoring/alerts/integrations/integrations.py
+++ b/elementary/monitor/data_monitoring/alerts/integrations/integrations.py
@@ -15,6 +15,9 @@
from elementary.messages.messaging_integrations.teams_webhook import (
TeamsWebhookMessagingIntegration,
)
+from elementary.monitor.data_monitoring.alerts.integrations.webhook.webhook import (
+ WebhookIntegration,
+)
from elementary.tracking.tracking_interface import Tracking
from elementary.utils.log import get_logger
@@ -49,6 +52,12 @@ def get_integration(
raise UnsupportedAlertIntegrationError
elif config.has_teams:
return TeamsWebhookMessagingIntegration(config.teams_webhook)
+ elif config.has_webhook:
+ return WebhookIntegration(
+ config=config,
+ tracking=tracking,
+ override_config_defaults=override_config_defaults,
+ )
else:
raise UnsupportedAlertIntegrationError
diff --git a/elementary/monitor/data_monitoring/alerts/integrations/webhook/templates/html-webhook-template.html b/elementary/monitor/data_monitoring/alerts/integrations/webhook/templates/html-webhook-template.html
new file mode 100644
index 000000000..f3f2ff337
--- /dev/null
+++ b/elementary/monitor/data_monitoring/alerts/integrations/webhook/templates/html-webhook-template.html
@@ -0,0 +1,158 @@
+
+
+
+
+
+ Adaptive Card
+
+
+
+
+ {{ title| replace('\"', '\\\"') }}
+
+ {% if subtitle and subtitle != None %}
+ {{ subtitle| replace('\"', '\\\"') }}
+ {% endif %}
+
+ {% if text and text != None %}
+ {{ text| replace('\"', '\\\"') }}
+ {% endif %}
+
+ {% if report_link and report_link != None %}
+
+ {% endif %}
+
+ {% if fields %}
+
+
Facts
+
+ {% for field in fields %}
+ - {{ field.key }}: {{ field.value| replace('\\', '') | replace('\"', '\\\"') | replace('\n', ' ') }}
+ {% endfor %}
+
+
+ {% endif %}
+
+ {% if model_errors %}
+ Model errors
+
+
+ | Error Summary |
+ Report Link |
+
+ {% for error in model_errors %}
+
+ | ⚫ {{ error.summary | replace('\\', '') | replace('\"', '\\\"') | replace('\n', ' ') }} |
+
+ {% if error.get_report_link() and error.get_report_link() != None %}
+ {{ error.get_report_link().text }}
+ {% endif %}
+ |
+
+ {% endfor %}
+
+ {% endif %}
+
+ {% if test_failures %}
+ Test failures
+
+
+ | Failure Summary |
+ Report Link |
+
+ {% for error in test_failures %}
+
+ | 🚧 {{ error.summary | replace('\\', '') | replace('\"', '\\\"') | replace('\n', ' ') }} |
+
+ {% if error.get_report_link() and error.get_report_link() != None %}
+ {{ error.get_report_link().text }}
+ {% endif %}
+ |
+
+ {% endfor %}
+
+ {% endif %}
+
+ {% if test_warnings %}
+ Test warnings
+
+
+ | Warning Summary |
+ Report Link |
+
+ {% for error in test_warnings %}
+
+ | ⚠️ {{ error.summary | replace('\\', '') | replace('\"', '\\\"') | replace('\n', ' ') }} |
+
+ {% if error.get_report_link() and error.get_report_link() != None %}
+ {{ error.get_report_link().text }}
+ {% endif %}
+ |
+
+ {% endfor %}
+
+ {% endif %}
+
+ {% if test_errors %}
+ Test errors
+
+
+ | Error Summary |
+ Report Link |
+
+ {% for error in test_errors %}
+
+ | 🔴 {{ error.summary | replace('\\', '') | replace('\"', '\\\"') | replace('\n', ' ') }} |
+
+ {% if error.get_report_link() and error.get_report_link() != None %}
+ {{ error.get_report_link().text }}
+ {% endif %}
+ |
+
+ {% endfor %}
+
+ {% endif %}
+
+ {% if anomalous_value and anomalous_value != None %}
+
+ Test results sample: {{ anomalous_value| replace('\\', '') | replace('\"', '\\\"') | replace('\n', ' ') }}
+
+ {% endif %}
+
+ {% if test_results_sample and test_results_sample != None %}
+
+ Test results sample: {{ test_results_sample| replace('\\', '') | replace('\"', '\\\"') | replace('\n', ' ') }}
+
+ {% endif %}
+
+
+
\ No newline at end of file
diff --git a/elementary/monitor/data_monitoring/alerts/integrations/webhook/templates/teams-webhook-template.json b/elementary/monitor/data_monitoring/alerts/integrations/webhook/templates/teams-webhook-template.json
new file mode 100644
index 000000000..7bc9d2e03
--- /dev/null
+++ b/elementary/monitor/data_monitoring/alerts/integrations/webhook/templates/teams-webhook-template.json
@@ -0,0 +1,309 @@
+{
+ "target_type": "{{ target_type }}",
+ "target_channel": "{{ target_channel }}",
+ "target_recipients": "{{ target_recipients }}",
+ "adaptative_card":
+ {
+ "type": "AdaptiveCard",
+ "body": [
+ {
+ "type": "TextBlock",
+ "size": "Large",
+ "weight": "Bolder",
+ "text": "{{ title| replace('\"', '\\\"') }}",
+ "wrap": true
+ }
+ {% if subtitle and subtitle != None -%},
+ {
+ "type": "TextBlock",
+ "size": "Medium",
+ "weight": "Normal",
+ "text": "{{ subtitle| replace('\"', '\\\"') }}",
+ "wrap": true
+ }
+ {% endif %}
+ {% if text and text != None -%},
+ {
+ "type": "TextBlock",
+ "size": "Small",
+ "weight": "Normal",
+ "text": "{{ text| replace('\"', '\\\"') }}",
+ "wrap": true
+ }
+ {% endif %}
+ {% if report_link and report_link != None -%},
+ {
+ "type": "ActionSet",
+ "actions": [
+ {
+ "type": "Action.OpenUrl",
+ "title": "{{ report_link.text }}",
+ "url": "{{ report_link.url }}"
+ }
+ ]
+ }
+ {% endif %}
+ {% if fields -%},
+ {
+ "type": "FactSet",
+ "facts": [
+ {% for field in fields %}
+ {% if loop.index > 1 %},{% endif %}
+ {
+ "title": "{{ field.key }}",
+ "value": "{{ field.value| replace('\\', '') | replace('\"', '\\\"') | replace('\n', ' ') }}"
+ }
+ {% endfor %}
+ ]
+ }
+ {% endif %}
+ {% if model_errors -%},
+ {
+ "type": "TextBlock",
+ "size": "Medium",
+ "weight": "Normal",
+ "text": "Model errors"
+ },
+ {
+ "type": "Table",
+ "columns": [
+ {
+ "width": 1
+ },
+ {
+ "width": 1
+ }
+ ],
+ "rows": [
+ {% for error in model_errors %}
+ {% if loop.index > 1 %},{% endif %}
+ {
+ "type": "TableRow",
+ "cells": [
+ {
+ "type": "TableCell",
+ "items": [
+ {
+ "type": "TextBlock",
+ "text": "⚫ {{ error.summary | replace('\\', '') | replace('\"', '\\\"') | replace('\n', ' ') }}",
+ "wrap": true
+ }
+ ]
+ },
+ {
+ "type": "TableCell",
+ "items": [
+ {% if error.get_report_link() and error.get_report_link() != None %}
+ {
+ "type": "ActionSet",
+ "actions": [
+ {
+ "type": "Action.OpenUrl",
+ "title": "{{ error.get_report_link().text }}",
+ "url": "{{ error.get_report_link().url }}"
+ }
+ ]
+ }
+ {% endif %}
+ ]
+ }
+ ]
+ }
+ {% endfor %}
+ ]
+ }
+ {% endif %}
+ {% if test_failures -%},
+ {
+ "type": "TextBlock",
+ "size": "Medium",
+ "weight": "Normal",
+ "text": "Test failures"
+ },
+ {
+ "type": "Table",
+ "columns": [
+ {
+ "width": 1
+ },
+ {
+ "width": 1
+ }
+ ],
+ "rows": [
+ {% for error in test_failures %}
+ {% if loop.index > 1 %},{% endif %}
+ {
+ "type": "TableRow",
+ "cells": [
+ {
+ "type": "TableCell",
+ "items": [
+ {
+ "type": "TextBlock",
+ "text": "\uD83D\uDED1 {{ error.summary | replace('\\', '') | replace('\"', '\\\"') | replace('\n', ' ') }}",
+ "wrap": true
+ }
+ ]
+ },
+ {
+ "type": "TableCell",
+ "items": [
+ {% if error.get_report_link() and error.get_report_link() != None %}
+ {
+ "type": "ActionSet",
+ "actions": [
+ {
+ "type": "Action.OpenUrl",
+ "title": "{{ error.get_report_link().text }}",
+ "url": "{{ error.get_report_link().url }}"
+ }
+ ]
+ }
+ {% endif %}
+ ]
+ }
+ ]
+ }
+ {% endfor %}
+ ]
+ }
+ {% endif %}
+ {% if test_warnings -%},
+ {
+ "type": "TextBlock",
+ "size": "Medium",
+ "weight": "Normal",
+ "text": "Test warnings"
+ },
+ {
+ "type": "Table",
+ "columns": [
+ {
+ "width": 1
+ },
+ {
+ "width": 1
+ }
+ ],
+ "rows": [
+ {% for error in test_warnings %}
+ {% if loop.index > 1 %},{% endif %}
+ {
+ "type": "TableRow",
+ "cells": [
+ {
+ "type": "TableCell",
+ "items": [
+ {
+ "type": "TextBlock",
+ "text": "\uD83D\uDD36 {{ error.summary | replace('\\', '') | replace('\"', '\\\"') | replace('\n', ' ') }}",
+ "wrap": true
+ }
+ ]
+ },
+ {
+ "type": "TableCell",
+ "items": [
+ {% if error.get_report_link() and error.get_report_link() != None %}
+ {
+ "type": "ActionSet",
+ "actions": [
+ {
+ "type": "Action.OpenUrl",
+ "title": "{{ error.get_report_link().text }}",
+ "url": "{{ error.get_report_link().url }}"
+ }
+ ]
+ }
+ {% endif %}
+ ]
+ }
+ ]
+ }
+ {% endfor %}
+ ]
+ }
+ {% endif %}
+ {% if test_errors -%},
+ {
+ "type": "TextBlock",
+ "size": "Medium",
+ "weight": "Normal",
+ "text": "Test errors"
+ },
+ {
+ "type": "Table",
+ "columns": [
+ {
+ "width": 1
+ },
+ {
+ "width": 1
+ }
+ ],
+ "rows": [
+ {% for error in test_errors %}
+ {% if loop.index > 1 %},{% endif %}
+ {
+ "type": "TableRow",
+ "cells": [
+ {
+ "type": "TableCell",
+ "items": [
+ {
+ "type": "TextBlock",
+ "text": "\uD83D\uDD34 {{ error.summary | replace('\\', '') | replace('\"', '\\\"') | replace('\n', ' ') }}",
+ "wrap": true
+ }
+ ]
+ },
+ {
+ "type": "TableCell",
+ "items": [
+ {% if error.get_report_link() and error.get_report_link() != None %}
+ {
+ "type": "ActionSet",
+ "actions": [
+ {
+ "type": "Action.OpenUrl",
+ "title": "{{ error.get_report_link().text }}",
+ "url": "{{ error.get_report_link().url }}"
+ }
+ ]
+ }
+ {% endif %}
+ ]
+ }
+ ]
+ }
+ {% endfor %}
+ ]
+ }
+ {% endif %}
+ {% if anomalous_value and anomalous_value != None %},
+ {
+ "type": "FactSet",
+ "facts": [
+ {
+ "title": "Test results sample",
+ "value": "{{ anomalous_value| replace('\\', '') | replace('\"', '\\\"') | replace('\n', ' ') }}"
+ }
+ ]
+ },
+ {% endif %}
+ {% if test_results_sample and test_results_sample != None %},
+ {
+ "type": "FactSet",
+ "facts": [
+ {
+ "title": "Test results sample",
+ "value": "{{ test_results_sample| replace('\\', '') | replace('\"', '\\\"') | replace('\n', ' ') }}"
+ }
+ ]
+ }
+ {% endif %}
+
+ ]
+ }
+}
diff --git a/elementary/monitor/data_monitoring/alerts/integrations/webhook/webhook.py b/elementary/monitor/data_monitoring/alerts/integrations/webhook/webhook.py
new file mode 100644
index 000000000..9122def75
--- /dev/null
+++ b/elementary/monitor/data_monitoring/alerts/integrations/webhook/webhook.py
@@ -0,0 +1,714 @@
+import json
+import simplejson
+import requests
+from datetime import datetime, timedelta
+from typing import Any, Dict, Optional, Union
+from requests.auth import HTTPBasicAuth
+from oauthlib.oauth2 import BackendApplicationClient
+from requests_oauthlib import OAuth2Session
+import pandas as pd
+from pymsteams import cardsection, potentialaction
+
+
+from elementary.config.config import Config
+from elementary.monitor.alerts.alerts_groups import AlertsGroup, GroupedByTableAlerts, BaseAlertsGroup
+from elementary.monitor.alerts.model_alert import ModelAlertModel
+from elementary.monitor.alerts.source_freshness_alert import SourceFreshnessAlertModel
+from elementary.monitor.alerts.test_alert import TestAlertModel
+from elementary.monitor.data_monitoring.alerts.integrations.base_integration import (
+ BaseIntegration,
+)
+
+from elementary.utils.json_utils import (
+ list_of_lists_of_strings_to_comma_delimited_unique_strings,
+)
+from elementary.tracking.tracking_interface import Tracking
+from elementary.utils.log import get_logger
+from elementary.utils.strings import prettify_and_dedup_list
+from jinja2 import Environment, FileSystemLoader, PackageLoader
+
+
+logger = get_logger(__name__)
+
+TABLE_FIELD = "table"
+COLUMN_FIELD = "column"
+DESCRIPTION_FIELD = "description"
+OWNERS_FIELD = "owners"
+TAGS_FIELD = "tags"
+SUBSCRIBERS_FIELD = "subscribers"
+RESULT_MESSAGE_FIELD = "result_message"
+TEST_PARAMS_FIELD = "test_parameters"
+TEST_QUERY_FIELD = "test_query"
+TEST_RESULTS_SAMPLE_FIELD = "test_results_sample"
+DEFAULT_ALERT_FIELDS = [
+ TABLE_FIELD,
+ COLUMN_FIELD,
+ DESCRIPTION_FIELD,
+ OWNERS_FIELD,
+ TAGS_FIELD,
+ SUBSCRIBERS_FIELD,
+ RESULT_MESSAGE_FIELD,
+ TEST_PARAMS_FIELD,
+ TEST_QUERY_FIELD,
+ TEST_RESULTS_SAMPLE_FIELD,
+]
+
+STATUS_DISPLAYS: Dict[str, Dict] = {
+ "fail": {"display_name": "Failure"},
+ "warn": {"display_name": "Warning"},
+ "error": {"display_name": "Error"},
+}
+
+
+class WebhookIntegration(BaseIntegration):
+ def __init__(
+ self,
+ config: Config,
+ tracking: Optional[Tracking] = None,
+ override_config_defaults=False,
+ *args,
+ **kwargs,
+ ) -> None:
+ self.config = config
+ self.tracking = tracking
+ self.override_config_defaults = override_config_defaults
+ super().__init__()
+
+
+ def _get_alert_template(
+ self,
+ alert: Union[
+ TestAlertModel,
+ ModelAlertModel,
+ SourceFreshnessAlertModel,
+ GroupedByTableAlerts,
+ BaseAlertsGroup,
+ ],
+ template=None,
+ integration_params=None,
+ *args,
+ **kwargs,
+ ):
+ if isinstance(alert, TestAlertModel):
+ if alert.is_elementary_test:
+ return self._get_elementary_test_template(alert, template=template, integration_params=integration_params)
+ else:
+ return self._get_dbt_test_template(alert, template=template, integration_params=integration_params)
+ elif isinstance(alert, ModelAlertModel):
+ if alert.materialization == "snapshot":
+ return self._get_snapshot_template(alert, template=template, integration_params=integration_params)
+ else:
+ return self._get_model_template(alert, template=template, integration_params=integration_params)
+ elif isinstance(alert, SourceFreshnessAlertModel):
+ return self._get_source_freshness_template(alert, template=template, integration_params=integration_params)
+ elif isinstance(alert, GroupedByTableAlerts):
+ return self._get_group_by_table_template(alert, template=template, integration_params=integration_params)
+ elif isinstance(alert, BaseAlertsGroup):
+ return self._get_alerts_group_template(alert, template=template, integration_params=integration_params)
+
+ @staticmethod
+ def _get_alert_sub_title(
+ alert: Union[
+ TestAlertModel,
+ ModelAlertModel,
+ SourceFreshnessAlertModel,
+ ],
+ ) -> str:
+ subtitle = "**"
+ subtitle += f"Status: {alert.status}"
+ if alert.suppression_interval:
+ subtitle += f" | Time: {alert.detected_at_str}"
+ subtitle += (
+ f" | Suppression interval: {alert.suppression_interval} hours"
+ )
+ else:
+ subtitle += f" | {alert.detected_at_str}"
+ subtitle += "**"
+
+ return subtitle
+
+ @staticmethod
+ def _get_display_name(alert_status: Optional[str]) -> str:
+ if alert_status is None:
+ return "Unknown"
+ return STATUS_DISPLAYS.get(alert_status, {}).get("display_name", alert_status)
+
+ def _get_report_link_if_applicable(
+ self,
+ alert: Union[
+ TestAlertModel,
+ ModelAlertModel,
+ SourceFreshnessAlertModel,
+ GroupedByTableAlerts,
+ ],
+ ):
+ report_link = alert.get_report_link()
+ if report_link:
+ return report_link
+ return None
+
+ def _get_table_field_if_applicable(self, alert: TestAlertModel):
+ if TABLE_FIELD in (alert.alert_fields or DEFAULT_ALERT_FIELDS):
+ return f"_{alert.table_full_name}_"
+ return None
+
+ def _get_column_field_if_applicable(self, alert: TestAlertModel):
+ if COLUMN_FIELD in (alert.alert_fields or DEFAULT_ALERT_FIELDS):
+ return f'_{alert.column_name or "No column"}_'
+ return None
+
+ def _get_tags_field_if_applicable(
+ self,
+ alert: Union[
+ TestAlertModel,
+ ModelAlertModel,
+ SourceFreshnessAlertModel,
+ ],
+ ):
+ if TAGS_FIELD in (alert.alert_fields or DEFAULT_ALERT_FIELDS):
+ tags = prettify_and_dedup_list(alert.tags or [])
+ return f'_{tags or "No tags"}_'
+ return None
+
+ def _get_owners_field_if_applicable(
+ self,
+ alert: Union[
+ TestAlertModel,
+ ModelAlertModel,
+ SourceFreshnessAlertModel,
+ ],
+ ):
+ if OWNERS_FIELD in (alert.alert_fields or DEFAULT_ALERT_FIELDS):
+ owners = prettify_and_dedup_list(alert.owners or [])
+ return f'_{owners or "No owners"}_'
+ return None
+
+ def _get_subscribers_field_if_applicable(
+ self,
+ alert: Union[
+ TestAlertModel,
+ ModelAlertModel,
+ SourceFreshnessAlertModel,
+ ],
+ ):
+ if SUBSCRIBERS_FIELD in (alert.alert_fields or DEFAULT_ALERT_FIELDS):
+ subscribers = prettify_and_dedup_list(alert.subscribers or [])
+ return f'_{subscribers or "No subscribers"}_'
+ return None
+
+ def _get_description_field_if_applicable(self, alert: TestAlertModel):
+ if DESCRIPTION_FIELD in (alert.alert_fields or DEFAULT_ALERT_FIELDS):
+ return f'_{alert.test_description or "No description"}_'
+ return None
+
+ def _get_result_message_field_if_applicable(
+ self,
+ alert: Union[
+ TestAlertModel,
+ ModelAlertModel,
+ ],
+ ):
+ message = None
+ if RESULT_MESSAGE_FIELD in (alert.alert_fields or DEFAULT_ALERT_FIELDS):
+ if isinstance(alert, ModelAlertModel):
+ if alert.message:
+ message = alert.message.strip()
+ elif isinstance(alert, TestAlertModel):
+ if alert.error_message:
+ message = alert.error_message.strip()
+ if not message:
+ message = "No result message"
+ return f"_{message}_"
+ return None
+
+ def _get_test_query_field_if_applicable(self, alert: TestAlertModel):
+ # This lacks logic to handle the case where the message is too long
+ if (
+ TEST_QUERY_FIELD in (alert.alert_fields or DEFAULT_ALERT_FIELDS)
+ and alert.test_results_query
+ ):
+ return f"```{alert.test_results_query.strip()}"
+ return None
+
+ def _get_test_params_field_if_applicable(self, alert: TestAlertModel):
+ if (
+ TEST_PARAMS_FIELD in (alert.alert_fields or DEFAULT_ALERT_FIELDS)
+ and alert.test_params
+ ):
+ return "*Test parameters*", f"```{alert.test_params}```"
+ return None
+
+ def _get_test_results_sample_field_if_applicable(
+ self, alert: TestAlertModel
+ ):
+ if TEST_RESULTS_SAMPLE_FIELD in (
+ alert.alert_fields or DEFAULT_ALERT_FIELDS
+ ) and alert.test_rows_sample is not None and len(alert.test_rows_sample) > 0:
+ df = pd.DataFrame(alert.test_rows_sample)
+ return df.to_string(index=False)
+ return None
+
+ def _get_test_anomalous_value_if_applicable(
+ self, alert: TestAlertModel
+ ):
+ if TEST_RESULTS_SAMPLE_FIELD in (
+ alert.alert_fields or DEFAULT_ALERT_FIELDS
+ ) and alert.test_type == "anomaly_detection" :
+ anomalous_value = alert.other
+ if alert.column_name:
+ return f"Column: {alert.column_name} | Anomalous value: {anomalous_value}"
+ else:
+ return f"Anomalous value: {anomalous_value}"
+ return None
+
+ def _get_recipients(self,alert: Union[
+ TestAlertModel,
+ ModelAlertModel,
+ SourceFreshnessAlertModel,
+ AlertsGroup
+ ]):
+ recipients = []
+ if isinstance(alert, AlertsGroup):
+ for alert in alert.alerts:
+ if alert.owners is not None:
+ recipients = recipients + alert.owners
+ if alert.subscribers is not None:
+ recipients = recipients + alert.subscribers
+ else:
+ if alert.owners is not None:
+ recipients = recipients + alert.owners
+ if alert.subscribers is not None:
+ recipients = recipients + alert.subscribers
+ return recipients
+
+ def _initial_client(self, *args, **kwargs):
+ pass
+
+ def _get_dbt_test_template(self, alert: TestAlertModel, template=None, integration_params: Dict=None, *args, **kwargs):
+ title = f"{self._get_display_name(alert.status)}: {alert.summary}"
+ subtitle = self._get_alert_sub_title(alert)
+ report_link = self._get_report_link_if_applicable(alert)
+ fields = [
+ {"key":"Table", "value": self._get_table_field_if_applicable(alert)},
+ {"key":"Column", "value": self._get_column_field_if_applicable(alert)},
+ {"key":"Owners", "value": self._get_owners_field_if_applicable(alert)},
+ {"key":"Subscribers", "value": self._get_subscribers_field_if_applicable(alert)},
+ {"key":"Description", "value": self._get_description_field_if_applicable(alert)},
+ {"key":"Result message", "value": self._get_result_message_field_if_applicable(alert)},
+ {"key":"Test query", "value": self._get_test_query_field_if_applicable(alert)},
+ {"key":"Test params", "value": self._get_test_params_field_if_applicable(alert)}
+ ]
+ return template.render(alert=alert,
+ target_type=integration_params.get("target_type"),
+ target_channel=integration_params.get("target_channel"),
+ recipients=integration_params.get("recipients"),
+ included_html=integration_params.get("included_html"),
+ title=title,
+ subtitle=subtitle,
+ report_link=report_link,
+ fields=fields,
+ test_results_sample=self._get_test_results_sample_field_if_applicable(alert),
+
+ )
+
+ def _get_elementary_test_template(self, alert: TestAlertModel, template=None, integration_params=None, *args, **kwargs):
+ if alert.test_type == "schema_change":
+ title = f"{alert.summary}"
+ else:
+ title = f"{self._get_display_name(alert.status)}: {alert.summary}"
+ subtitle = self._get_alert_sub_title(alert)
+ report_link = self._get_report_link_if_applicable(alert)
+
+ anomalous_value = (
+ alert.other if alert.test_type == "anomaly_detection" else None
+ )
+
+ fields = [
+ {"key":"Table", "value": self._get_table_field_if_applicable(alert)},
+ {"key":"Column", "value": self._get_column_field_if_applicable(alert)},
+ {"key":"Owners", "value": self._get_owners_field_if_applicable(alert)},
+ {"key":"Subscribers", "value": self._get_subscribers_field_if_applicable(alert)},
+ {"key":"Description", "value": self._get_description_field_if_applicable(alert)},
+ {"key":"Result message", "value": self._get_result_message_field_if_applicable(alert)},
+ {"key":"Test query", "value": self._get_test_query_field_if_applicable(alert)},
+ {"key":"Test params", "value": self._get_test_params_field_if_applicable(alert)}
+ ]
+ return template.render(alert=alert,
+ title=title,
+ subtitle=subtitle,
+ report_link=report_link,
+ fields=fields,
+ target_type=integration_params.get("target_type"),
+ target_channel=integration_params.get("target_channel"),
+ recipients=integration_params.get("recipients"),
+ included_html=integration_params.get("included_html"),
+ test_results_sample=self._get_test_results_sample_field_if_applicable(alert),
+ anomalous_value=self._get_test_anomalous_value_if_applicable(alert)
+ )
+
+
+ def _get_model_template(self, alert: ModelAlertModel, template=None, integration_params=None, *args, **kwargs):
+ title = f"{self._get_display_name(alert.status)}: {alert.summary}"
+ subtitle = self._get_alert_sub_title(alert)
+ report_link = self._get_report_link_if_applicable(alert)
+
+ fields = [
+ {"key":"Tags", "value": self._get_tags_field_if_applicable(alert)},
+ {"key":"Owners", "value": self._get_owners_field_if_applicable(alert)},
+ {"key":"Subscribers", "value": self._get_subscribers_field_if_applicable(alert)},
+ {"key":"Result message", "value": self._get_result_message_field_if_applicable(alert)}
+ ]
+
+ if alert.materialization:
+ fields.append({"key": "Materialisation", "value": f"`{str(alert.materialization)}`"})
+ if alert.full_refresh:
+ fields.append({"key":"Full refresh", "value": f"`{alert.full_refresh}`"})
+ if alert.path:
+ fields.append({"key": "Path", "value": f"`{alert.path}`"})
+
+ return template.render(alert=alert,
+ title=title,
+ subtitle=subtitle,
+ report_link=report_link,
+ fields=fields,
+ target_type=integration_params.get("target_type"),
+ target_channel=integration_params.get("target_channel"),
+ recipients=integration_params.get("recipients"),
+ included_html=integration_params.get("included_html"),
+ )
+
+
+ def _get_snapshot_template(self, alert: ModelAlertModel, template=None, integration_params=None, *args, **kwargs):
+ title = f"{self._get_display_name(alert.status)}: {alert.summary}"
+ subtitle = self._get_alert_sub_title(alert)
+ report_link = self._get_report_link_if_applicable(alert)
+
+ fields = [
+ {"key":"Tags", "value": self._get_tags_field_if_applicable(alert)},
+ {"key":"Owners", "value": self._get_owners_field_if_applicable(alert)},
+ {"key":"Subscribers", "value": self._get_subscribers_field_if_applicable(alert)},
+ {"key":"Result_message", "value": self._get_result_message_field_if_applicable(alert)}
+ ]
+
+ if alert.original_path:
+ fields.append({"key": "Path", "value": f"`{alert.original_path}`"})
+
+ return template.render(title=title,
+ alert=alert,
+ subtitle=subtitle,
+ report_link=report_link,
+ fields=fields,
+ target_type=integration_params.get("target_type"),
+ target_channel=integration_params.get("target_channel"),
+ recipients=integration_params.get("recipients"),
+ included_html=integration_params.get("included_html"),
+ )
+
+
+ def _get_source_freshness_template(
+ self, alert: SourceFreshnessAlertModel, template=None, integration_params=None, *args, **kwargs
+ ):
+ title = f"{self._get_display_name(alert.status)}: {alert.summary}"
+ subtitle = self._get_alert_sub_title(alert)
+ report_link = self._get_report_link_if_applicable(alert)
+
+ fields = [
+ {"key":"Tags", "value": self._get_tags_field_if_applicable(alert)},
+ {"key":"Owners", "value": self._get_owners_field_if_applicable(alert)},
+ {"key":"Subscribers", "value": self._get_subscribers_field_if_applicable(alert)}
+ ]
+
+ if alert.freshness_description:
+ fields.append({"key": "Description", "value": f'_{alert.freshness_description or "No description"}_'})
+
+ if alert.status == "runtime error":
+ fields.append({"key": "Result message", "value": f"Failed to calculate the source freshness\n```{alert.error}```"})
+ else:
+ fields.append({"key": "Result message", "value": f"```{alert.result_description}```"})
+ fields.append({"key": "Time elapsed", "value":f"{timedelta(seconds=alert.max_loaded_at_time_ago_in_s) if alert.max_loaded_at_time_ago_in_s else 'N/A'}"})
+ fields.append({"key": "Last record at", "value":f"{alert.max_loaded_at}"})
+ fields.append({"key": "Sampled at", "value":f"{alert.snapshotted_at_str}"})
+ if alert.error_after:
+ fields.append({"key": "Error after", "value": f"`{alert.error_after}`"})
+ fields.append({"key": "Warn after", "value": f"`{alert.warn_after}`"})
+ fields.append({"key": "Filter", "value": f"`{alert.filter}`"})
+ if alert.path:
+ fields.append({"key": "Path", "value": f"`{alert.path}`"})
+
+ return template.render(title=title,
+ alert=alert,
+ subtitle=subtitle,
+ report_link=report_link,
+ fields=fields,
+ target_type=integration_params.get("target_type"),
+ target_channel=integration_params.get("target_channel"),
+ recipients=integration_params.get("recipients"),
+ included_html=integration_params.get("included_html"),
+ )
+
+ def _get_group_by_table_template(
+ self, alert: GroupedByTableAlerts, template=None, integration_params=None, *args, **kwargs
+ ):
+ alerts = alert.alerts
+ title = f"{self._get_display_name(alert.status)}: {alert.summary}"
+ subtitle = ""
+
+ if alert.model_errors:
+ subtitle = (
+ subtitle
+ + (" | " + f"😵 Model errors: {len(alert.model_errors)}")
+ if subtitle
+ else f"😵 Model errors: {len(alert.model_errors)}"
+ )
+ if alert.test_failures:
+ subtitle = (
+ subtitle
+ + (" | " + f"🔺 Test failures: {len(alert.test_failures)}")
+ if subtitle
+ else f"🔺 Test failures: {len(alert.test_failures)}"
+ )
+ if alert.test_warnings:
+ subtitle = (
+ subtitle
+ + (" | " + f"⚠ Test warnings: {len(alert.test_warnings)}")
+ if subtitle
+ else f"⚠ Test warnings: {len(alert.test_warnings)}"
+ )
+ if alert.test_errors:
+ subtitle = (
+ subtitle + (" | " + f"❗ Test errors: {len(alert.test_errors)}")
+ if subtitle
+ else f"❗ Test errors: {len(alert.test_errors)}"
+ )
+
+ report_link = self._get_report_link_if_applicable(alert)
+
+ tags = list_of_lists_of_strings_to_comma_delimited_unique_strings(
+ [alert.tags or [] for alert in alerts]
+ )
+ owners = list_of_lists_of_strings_to_comma_delimited_unique_strings(
+ [alert.owners or [] for alert in alerts]
+ )
+ subscribers = list_of_lists_of_strings_to_comma_delimited_unique_strings(
+ [alert.subscribers or [] for alert in alerts]
+ )
+ fields = [
+ {"key":"Tags", "value": f'_{tags if tags else "No tags"}_'},
+ {"key":"Owners", "value": f'_{owners if owners else "No owners"}_'},
+ {"key":"Subscribers", "value": f'_{subscribers if subscribers else "No subscribers"}_'}
+ ]
+ return template.render(title=title,
+ alert=alert,
+ subtitle=subtitle,
+ report_link=report_link,
+ fields=fields,
+ model_errors=alert.model_errors,
+ test_failures=alert.test_failures,
+ test_warnings=alert.test_warnings,
+ test_errors=alert.test_errors,
+ target_type=integration_params.get("target_type"),
+ target_channel=integration_params.get("target_channel"),
+ recipients=integration_params.get("recipients"),
+ included_html=integration_params.get("included_html"),
+ )
+
+ def _get_alerts_group_template(self, alert: AlertsGroup, template=None, integration_params=None, *args, **kwargs): # type: ignore[override]
+ title = f"{self._get_display_name(alert.status)}: {alert.summary}"
+
+ subtitle = ""
+ if alert.model_errors:
+ subtitle = (
+ subtitle
+ + (" | " + f"⚫ Model errors: {len(alert.model_errors)}")
+ if subtitle
+ else f"⚫ Model errors: {len(alert.model_errors)}"
+ )
+ if alert.test_failures:
+ subtitle = (
+ subtitle
+ + (" | " + f"🛑 Test failures: {len(alert.test_failures)}")
+ if subtitle
+ else f"🛑 Test failures: {len(alert.test_failures)}"
+ )
+ if alert.test_warnings:
+ subtitle = (
+ subtitle
+ + (" | " + f"🔶 Test warnings: {len(alert.test_warnings)}")
+ if subtitle
+ else f"🔶 Test warnings: {len(alert.test_warnings)}"
+ )
+ if alert.test_errors:
+ subtitle = (
+ subtitle + (" | " + f"🔴 Test errors: {len(alert.test_errors)}")
+ if subtitle
+ else f"🔴 Test errors: {len(alert.test_errors)}"
+ )
+ return template.render(title=title,
+ alert=alert,
+ subtitle=subtitle,
+ model_errors=alert.model_errors,
+ test_failures=alert.test_failures,
+ test_warnings=alert.test_warnings,
+ test_errors=alert.test_errors,
+ target_type=integration_params.get("target_type"),
+ target_channel=integration_params.get("target_channel"),
+ recipients=integration_params.get("recipients"),
+ included_html=integration_params.get("included_html"),
+ )
+
+
+ def _get_fallback_template(
+ self,
+ alert: Union[
+ TestAlertModel,
+ ModelAlertModel,
+ SourceFreshnessAlertModel,
+ GroupedByTableAlerts,
+ AlertsGroup,
+ ],
+ template=None,
+ integration_params=None,
+ *args,
+ **kwargs,
+ ):
+ return template.render(title="Oops, we failed to format the alert ! -_-' Please share this with the Elementary team via or a issue.",
+ text=f"```{json.dumps(alert.data, indent=2)}",
+ target_type=integration_params.get("target_type"),
+ target_channel=integration_params.get("target_channel"),
+ recipients=integration_params.get("recipients"),
+ alert=alert
+ )
+
+ def _get_test_message_template(self, template=None, integration_params=None, *args, **kwargs):
+ return template.render(title="This is a test message generated by Elementary!",
+ text=f"Elementary monitor ran successfully on {datetime.now().strftime('%Y-%m-%d %H:%M')}",
+ target_type=integration_params.get("target_type"),
+ target_channel=integration_params.get("target_channel"),
+ recipients=integration_params.get("recipients"),
+ )
+
+ def _get_integrations_params(
+ self,
+ alert: Union[
+ TestAlertModel,
+ ModelAlertModel,
+ SourceFreshnessAlertModel,
+ GroupedByTableAlerts,
+ BaseAlertsGroup,
+ None
+ ],
+ *args,
+ **kwargs,
+ ) -> Dict[str, Any]:
+ integration_params = dict()
+ if alert is not None and alert.unified_meta.get("webhook_target_type"):
+ target_type = alert.unified_meta.get("webhook_target_type")
+ logger.debug(f"Using target_type from meta: {target_type}")
+ else:
+ target_type = self.config.webhook_target_type
+ integration_params.update({"target_type": target_type})
+
+ if alert is not None and alert.unified_meta.get("webhook_target_channel"):
+ target_channel = alert.unified_meta.get("webhook_target_channel")
+ logger.debug(f"Using target_channel from meta: {target_channel}")
+ else:
+ target_channel = self.config.webhook_target_channel
+ integration_params.update({"target_channel": target_channel})
+
+ integration_params.update({"recipients": self._get_recipients(alert)})
+
+ return integration_params
+
+
+ def send_alert(self, alert: Union[
+ TestAlertModel,
+ ModelAlertModel,
+ SourceFreshnessAlertModel,
+ GroupedByTableAlerts,
+ BaseAlertsGroup,
+ ], *args, **kwargs) -> bool:
+
+ if self.config.webhook_template_dir is not None:
+ env = Environment(loader=FileSystemLoader(self.config.webhook_template_dir))
+ else:
+ env = Environment(
+ loader=PackageLoader("elementary.monitor.data_monitoring.alerts.integrations.webhook", "templates"))
+ template = env.get_template(self.config.webhook_template)
+ integration_params = self._get_integrations_params(alert)
+
+ try:
+ logger.debug("Sending alert via Webhook.")
+ if self.config.webhook_included_html_template is not None:
+ included_html_template = env.get_template(self.config.webhook_included_html_template)
+ included_html = self._get_alert_template(alert, template=included_html_template, integration_params=integration_params)
+ included_html = simplejson.encoder.JSONEncoderForHTML().encode(included_html)
+ integration_params.update({"included_html": included_html})
+ request_body= self._get_alert_template(alert, template=template, integration_params=integration_params)
+ response = self.send(request_body)
+ sent_successfully = response.ok is True
+ except Exception as e:
+ logger.error(
+ f"Unable to send alert via Webhook: {e}\nSending fallback template."
+ )
+ sent_successfully = False
+
+ if not sent_successfully:
+ try:
+ response = self.send(self._get_fallback_template(alert, template=template, integration_params=integration_params))
+ fallback_sent_successfully = response.ok is True
+ except Exception as e:
+ logger.error(f"Unable to send alert fallback via Webhook: {e}")
+ fallback_sent_successfully = False
+ sent_successfully = fallback_sent_successfully
+
+ return sent_successfully
+
+ def send_test_message(self, *args, **kwargs) -> bool:
+
+ if self.config.webhook_template_dir is not None:
+ env = Environment(loader=FileSystemLoader(self.config.webhook_template_dir))
+ else:
+ env = Environment(
+ loader=PackageLoader("elementary.monitor.data_monitoring.alerts.integrations.webhook", "templates"))
+ template = env.get_template(self.config.webhook_template)
+ integration_params = self._get_integrations_params(None)
+
+ self.send(self._get_test_message_template(template=template, integration_params=integration_params))
+
+ def send(self, request_body):
+ proxies = None
+ if self.config.webhook_http_proxy is not None:
+ proxies = {
+ "http": self.config.webhook_http_proxy,
+ "https": self.config.webhook_http_proxy
+ }
+ auth = None
+ headers = {"Content-Type": self.config.webhook_http_content_type} if self.config.webhook_http_content_type is not None else {}
+ if self.config.webhook_http_auth_scheme == "basic":
+ auth = HTTPBasicAuth(self.config.webhook_http_auth_basic_user, self.config.webhook_http_auth_basic_pass)
+ elif self.config.webhook_http_auth_scheme == "oauth2":
+ scope = None
+ if self.config.webhook_http_auth_oauth2_scope is not None:
+ scope = str(self.config.webhook_http_auth_oauth2_scope).split(",")
+ client = BackendApplicationClient(client_id=self.config.webhook_http_auth_oauth2_client_id, scope=scope)
+ oauth = OAuth2Session(client=client)
+ secret = self.config.webhook_http_auth_oauth2_secret.__getitem__(0) if isinstance(self.config.webhook_http_auth_oauth2_secret, tuple) else self.config.webhook_http_auth_oauth2_secret
+ try:
+ auth_response = oauth.fetch_token(token_url=self.config.webhook_http_auth_oauth2_url, client_id=self.config.webhook_http_auth_oauth2_client_id,
+ client_secret=secret, proxies=proxies)
+ access_token = auth_response['access_token']
+
+ except Exception as e:
+ logger.error(
+ f"Unable to authenticate with oauth: {e}"
+ )
+ raise e
+
+ headers.update({
+ "Authorization": f"Bearer {access_token}"
+ })
+ if self.config.webhook_http_content_type == 'application/json':
+ return requests.post(self.config.webhook_http_url, json=json.loads(request_body), proxies=proxies, auth=auth, headers=headers, verify=self.config.webhook_http_ssl_verify)
+ else:
+ return requests.post(self.config.webhook_http_url, data=request_body, proxies=proxies, auth=auth, headers=headers, verify=self.config.webhook_http_ssl_verify)
+
diff --git a/pyproject.toml b/pyproject.toml
index ef304598a..16d9b42c6 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -35,6 +35,11 @@ numpy = "<2.0.0"
tabulate = ">= 0.9.0"
pytz = "^2025.1"
+Jinja2 = ">=3.1.5"
+oauthlib = ">=3.2.2"
+requests_oauthlib = ">=2.0.0"
+simplejson = ">=3.19.3"
+
dbt-snowflake = {version = ">=0.20,<2.0.0", optional = true}
dbt-bigquery = {version = ">=0.20,<2.0.0", optional = true}
dbt-redshift = {version = ">=0.20,<2.0.0", optional = true}