From a18795ac778b5fd55e468a1e8accc6c4cd02072a Mon Sep 17 00:00:00 2001 From: Gleb Kanterov Date: Tue, 29 Apr 2025 16:29:44 +0200 Subject: [PATCH] [Python] Add new task types --- .../codegen/codegen/jsonschema_patch.py | 5 -- .../databricks/bundles/jobs/__init__.py | 57 ++++++++++++ .../jobs/_models/authentication_method.py | 14 +++ .../bundles/jobs/_models/dashboard_task.py | 54 ++++++++++++ .../databricks/bundles/jobs/_models/job.py | 5 +- .../bundles/jobs/_models/power_bi_model.py | 85 ++++++++++++++++++ .../bundles/jobs/_models/power_bi_table.py | 71 +++++++++++++++ .../bundles/jobs/_models/power_bi_task.py | 88 +++++++++++++++++++ .../bundles/jobs/_models/storage_mode.py | 15 ++++ .../bundles/jobs/_models/subscription.py | 58 ++++++++++++ .../jobs/_models/subscription_subscriber.py | 36 ++++++++ .../databricks/bundles/jobs/_models/task.py | 29 ++++++ 12 files changed, 511 insertions(+), 6 deletions(-) create mode 100644 experimental/python/databricks/bundles/jobs/_models/authentication_method.py create mode 100644 experimental/python/databricks/bundles/jobs/_models/dashboard_task.py create mode 100644 experimental/python/databricks/bundles/jobs/_models/power_bi_model.py create mode 100644 experimental/python/databricks/bundles/jobs/_models/power_bi_table.py create mode 100644 experimental/python/databricks/bundles/jobs/_models/power_bi_task.py create mode 100644 experimental/python/databricks/bundles/jobs/_models/storage_mode.py create mode 100644 experimental/python/databricks/bundles/jobs/_models/subscription.py create mode 100644 experimental/python/databricks/bundles/jobs/_models/subscription_subscriber.py diff --git a/experimental/python/codegen/codegen/jsonschema_patch.py b/experimental/python/codegen/codegen/jsonschema_patch.py index c1b4b9c0bd..848df05069 100644 --- a/experimental/python/codegen/codegen/jsonschema_patch.py +++ b/experimental/python/codegen/codegen/jsonschema_patch.py @@ -3,11 +3,6 @@ from codegen.jsonschema import Schema REMOVED_FIELDS = { - # TODO remove as a follow-up - "jobs.Task": { - "dashboard_task", - "power_bi_task", - }, "compute.ClusterSpec": { # doesn't work, openapi schema needs to be updated to be enum "kind", diff --git a/experimental/python/databricks/bundles/jobs/__init__.py b/experimental/python/databricks/bundles/jobs/__init__.py index 495124c8ec..32fd0e6599 100644 --- a/experimental/python/databricks/bundles/jobs/__init__.py +++ b/experimental/python/databricks/bundles/jobs/__init__.py @@ -2,6 +2,8 @@ "Adlsgen2Info", "Adlsgen2InfoDict", "Adlsgen2InfoParam", + "AuthenticationMethod", + "AuthenticationMethodParam", "AutoScale", "AutoScaleDict", "AutoScaleParam", @@ -43,6 +45,9 @@ "CronSchedule", "CronScheduleDict", "CronScheduleParam", + "DashboardTask", + "DashboardTaskDict", + "DashboardTaskParam", "DataSecurityMode", "DataSecurityModeParam", "DbfsStorageInfo", @@ -153,6 +158,15 @@ "PipelineTask", "PipelineTaskDict", "PipelineTaskParam", + "PowerBiModel", + "PowerBiModelDict", + "PowerBiModelParam", + "PowerBiTable", + "PowerBiTableDict", + "PowerBiTableParam", + "PowerBiTask", + "PowerBiTaskDict", + "PowerBiTaskParam", "PythonPyPiLibrary", "PythonPyPiLibraryDict", "PythonPyPiLibraryParam", @@ -204,6 +218,14 @@ "SqlTaskSubscription", "SqlTaskSubscriptionDict", "SqlTaskSubscriptionParam", + "StorageMode", + "StorageModeParam", + "Subscription", + "SubscriptionDict", + "SubscriptionParam", + "SubscriptionSubscriber", + "SubscriptionSubscriberDict", + "SubscriptionSubscriberParam", "TableUpdateTriggerConfiguration", "TableUpdateTriggerConfigurationDict", "TableUpdateTriggerConfigurationParam", @@ -384,6 +406,10 @@ WorkspaceStorageInfoDict, WorkspaceStorageInfoParam, ) +from databricks.bundles.jobs._models.authentication_method import ( + AuthenticationMethod, + AuthenticationMethodParam, +) from databricks.bundles.jobs._models.clean_rooms_notebook_task import ( CleanRoomsNotebookTask, CleanRoomsNotebookTaskDict, @@ -414,6 +440,11 @@ CronScheduleDict, CronScheduleParam, ) +from databricks.bundles.jobs._models.dashboard_task import ( + DashboardTask, + DashboardTaskDict, + DashboardTaskParam, +) from databricks.bundles.jobs._models.dbt_task import DbtTask, DbtTaskDict, DbtTaskParam from databricks.bundles.jobs._models.file_arrival_trigger_configuration import ( FileArrivalTriggerConfiguration, @@ -523,6 +554,21 @@ PipelineTaskDict, PipelineTaskParam, ) +from databricks.bundles.jobs._models.power_bi_model import ( + PowerBiModel, + PowerBiModelDict, + PowerBiModelParam, +) +from databricks.bundles.jobs._models.power_bi_table import ( + PowerBiTable, + PowerBiTableDict, + PowerBiTableParam, +) +from databricks.bundles.jobs._models.power_bi_task import ( + PowerBiTask, + PowerBiTaskDict, + PowerBiTaskParam, +) from databricks.bundles.jobs._models.python_wheel_task import ( PythonWheelTask, PythonWheelTaskDict, @@ -581,6 +627,17 @@ SqlTaskSubscriptionDict, SqlTaskSubscriptionParam, ) +from databricks.bundles.jobs._models.storage_mode import StorageMode, StorageModeParam +from databricks.bundles.jobs._models.subscription import ( + Subscription, + SubscriptionDict, + SubscriptionParam, +) +from databricks.bundles.jobs._models.subscription_subscriber import ( + SubscriptionSubscriber, + SubscriptionSubscriberDict, + SubscriptionSubscriberParam, +) from databricks.bundles.jobs._models.table_update_trigger_configuration import ( TableUpdateTriggerConfiguration, TableUpdateTriggerConfigurationDict, diff --git a/experimental/python/databricks/bundles/jobs/_models/authentication_method.py b/experimental/python/databricks/bundles/jobs/_models/authentication_method.py new file mode 100644 index 0000000000..f4b5b1e541 --- /dev/null +++ b/experimental/python/databricks/bundles/jobs/_models/authentication_method.py @@ -0,0 +1,14 @@ +from enum import Enum +from typing import Literal + + +class AuthenticationMethod(Enum): + """ + :meta private: [EXPERIMENTAL] + """ + + OAUTH = "OAUTH" + PAT = "PAT" + + +AuthenticationMethodParam = Literal["OAUTH", "PAT"] | AuthenticationMethod diff --git a/experimental/python/databricks/bundles/jobs/_models/dashboard_task.py b/experimental/python/databricks/bundles/jobs/_models/dashboard_task.py new file mode 100644 index 0000000000..6284ca36d3 --- /dev/null +++ b/experimental/python/databricks/bundles/jobs/_models/dashboard_task.py @@ -0,0 +1,54 @@ +from dataclasses import dataclass +from typing import TYPE_CHECKING, TypedDict + +from databricks.bundles.core._transform import _transform +from databricks.bundles.core._transform_to_json import _transform_to_json_value +from databricks.bundles.core._variable import VariableOrOptional +from databricks.bundles.jobs._models.subscription import ( + Subscription, + SubscriptionParam, +) + +if TYPE_CHECKING: + from typing_extensions import Self + + +@dataclass(kw_only=True) +class DashboardTask: + """ + Configures the Lakeview Dashboard job task type. + """ + + dashboard_id: VariableOrOptional[str] = None + + subscription: VariableOrOptional[Subscription] = None + + warehouse_id: VariableOrOptional[str] = None + """ + Optional: The warehouse id to execute the dashboard with for the schedule. + If not specified, the default warehouse of the dashboard will be used. + """ + + @classmethod + def from_dict(cls, value: "DashboardTaskDict") -> "Self": + return _transform(cls, value) + + def as_dict(self) -> "DashboardTaskDict": + return _transform_to_json_value(self) # type:ignore + + +class DashboardTaskDict(TypedDict, total=False): + """""" + + dashboard_id: VariableOrOptional[str] + + subscription: VariableOrOptional[SubscriptionParam] + + warehouse_id: VariableOrOptional[str] + """ + Optional: The warehouse id to execute the dashboard with for the schedule. + If not specified, the default warehouse of the dashboard will be used. + """ + + +DashboardTaskParam = DashboardTaskDict | DashboardTask diff --git a/experimental/python/databricks/bundles/jobs/_models/job.py b/experimental/python/databricks/bundles/jobs/_models/job.py index 6d66279e04..190ae71ad7 100644 --- a/experimental/python/databricks/bundles/jobs/_models/job.py +++ b/experimental/python/databricks/bundles/jobs/_models/job.py @@ -17,7 +17,10 @@ CronSchedule, CronScheduleParam, ) -from databricks.bundles.jobs._models.git_source import GitSource, GitSourceParam +from databricks.bundles.jobs._models.git_source import ( + GitSource, + GitSourceParam, +) from databricks.bundles.jobs._models.job_cluster import JobCluster, JobClusterParam from databricks.bundles.jobs._models.job_email_notifications import ( JobEmailNotifications, diff --git a/experimental/python/databricks/bundles/jobs/_models/power_bi_model.py b/experimental/python/databricks/bundles/jobs/_models/power_bi_model.py new file mode 100644 index 0000000000..4d5f657d72 --- /dev/null +++ b/experimental/python/databricks/bundles/jobs/_models/power_bi_model.py @@ -0,0 +1,85 @@ +from dataclasses import dataclass +from typing import TYPE_CHECKING, TypedDict + +from databricks.bundles.core._transform import _transform +from databricks.bundles.core._transform_to_json import _transform_to_json_value +from databricks.bundles.core._variable import VariableOrOptional +from databricks.bundles.jobs._models.authentication_method import ( + AuthenticationMethod, + AuthenticationMethodParam, +) +from databricks.bundles.jobs._models.storage_mode import StorageMode, StorageModeParam + +if TYPE_CHECKING: + from typing_extensions import Self + + +@dataclass(kw_only=True) +class PowerBiModel: + """ + :meta private: [EXPERIMENTAL] + """ + + authentication_method: VariableOrOptional[AuthenticationMethod] = None + """ + How the published Power BI model authenticates to Databricks + """ + + model_name: VariableOrOptional[str] = None + """ + The name of the Power BI model + """ + + overwrite_existing: VariableOrOptional[bool] = None + """ + Whether to overwrite existing Power BI models + """ + + storage_mode: VariableOrOptional[StorageMode] = None + """ + The default storage mode of the Power BI model + """ + + workspace_name: VariableOrOptional[str] = None + """ + The name of the Power BI workspace of the model + """ + + @classmethod + def from_dict(cls, value: "PowerBiModelDict") -> "Self": + return _transform(cls, value) + + def as_dict(self) -> "PowerBiModelDict": + return _transform_to_json_value(self) # type:ignore + + +class PowerBiModelDict(TypedDict, total=False): + """""" + + authentication_method: VariableOrOptional[AuthenticationMethodParam] + """ + How the published Power BI model authenticates to Databricks + """ + + model_name: VariableOrOptional[str] + """ + The name of the Power BI model + """ + + overwrite_existing: VariableOrOptional[bool] + """ + Whether to overwrite existing Power BI models + """ + + storage_mode: VariableOrOptional[StorageModeParam] + """ + The default storage mode of the Power BI model + """ + + workspace_name: VariableOrOptional[str] + """ + The name of the Power BI workspace of the model + """ + + +PowerBiModelParam = PowerBiModelDict | PowerBiModel diff --git a/experimental/python/databricks/bundles/jobs/_models/power_bi_table.py b/experimental/python/databricks/bundles/jobs/_models/power_bi_table.py new file mode 100644 index 0000000000..e21bd6a1ca --- /dev/null +++ b/experimental/python/databricks/bundles/jobs/_models/power_bi_table.py @@ -0,0 +1,71 @@ +from dataclasses import dataclass +from typing import TYPE_CHECKING, TypedDict + +from databricks.bundles.core._transform import _transform +from databricks.bundles.core._transform_to_json import _transform_to_json_value +from databricks.bundles.core._variable import VariableOrOptional +from databricks.bundles.jobs._models.storage_mode import StorageMode, StorageModeParam + +if TYPE_CHECKING: + from typing_extensions import Self + + +@dataclass(kw_only=True) +class PowerBiTable: + """ + :meta private: [EXPERIMENTAL] + """ + + catalog: VariableOrOptional[str] = None + """ + The catalog name in Databricks + """ + + name: VariableOrOptional[str] = None + """ + The table name in Databricks + """ + + schema: VariableOrOptional[str] = None + """ + The schema name in Databricks + """ + + storage_mode: VariableOrOptional[StorageMode] = None + """ + The Power BI storage mode of the table + """ + + @classmethod + def from_dict(cls, value: "PowerBiTableDict") -> "Self": + return _transform(cls, value) + + def as_dict(self) -> "PowerBiTableDict": + return _transform_to_json_value(self) # type:ignore + + +class PowerBiTableDict(TypedDict, total=False): + """""" + + catalog: VariableOrOptional[str] + """ + The catalog name in Databricks + """ + + name: VariableOrOptional[str] + """ + The table name in Databricks + """ + + schema: VariableOrOptional[str] + """ + The schema name in Databricks + """ + + storage_mode: VariableOrOptional[StorageModeParam] + """ + The Power BI storage mode of the table + """ + + +PowerBiTableParam = PowerBiTableDict | PowerBiTable diff --git a/experimental/python/databricks/bundles/jobs/_models/power_bi_task.py b/experimental/python/databricks/bundles/jobs/_models/power_bi_task.py new file mode 100644 index 0000000000..47d74f18b7 --- /dev/null +++ b/experimental/python/databricks/bundles/jobs/_models/power_bi_task.py @@ -0,0 +1,88 @@ +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, TypedDict + +from databricks.bundles.core._transform import _transform +from databricks.bundles.core._transform_to_json import _transform_to_json_value +from databricks.bundles.core._variable import VariableOrList, VariableOrOptional +from databricks.bundles.jobs._models.power_bi_model import ( + PowerBiModel, + PowerBiModelParam, +) +from databricks.bundles.jobs._models.power_bi_table import ( + PowerBiTable, + PowerBiTableParam, +) + +if TYPE_CHECKING: + from typing_extensions import Self + + +@dataclass(kw_only=True) +class PowerBiTask: + """ + :meta private: [EXPERIMENTAL] + """ + + connection_resource_name: VariableOrOptional[str] = None + """ + The resource name of the UC connection to authenticate from Databricks to Power BI + """ + + power_bi_model: VariableOrOptional[PowerBiModel] = None + """ + The semantic model to update + """ + + refresh_after_update: VariableOrOptional[bool] = None + """ + Whether the model should be refreshed after the update + """ + + tables: VariableOrList[PowerBiTable] = field(default_factory=list) + """ + The tables to be exported to Power BI + """ + + warehouse_id: VariableOrOptional[str] = None + """ + The SQL warehouse ID to use as the Power BI data source + """ + + @classmethod + def from_dict(cls, value: "PowerBiTaskDict") -> "Self": + return _transform(cls, value) + + def as_dict(self) -> "PowerBiTaskDict": + return _transform_to_json_value(self) # type:ignore + + +class PowerBiTaskDict(TypedDict, total=False): + """""" + + connection_resource_name: VariableOrOptional[str] + """ + The resource name of the UC connection to authenticate from Databricks to Power BI + """ + + power_bi_model: VariableOrOptional[PowerBiModelParam] + """ + The semantic model to update + """ + + refresh_after_update: VariableOrOptional[bool] + """ + Whether the model should be refreshed after the update + """ + + tables: VariableOrList[PowerBiTableParam] + """ + The tables to be exported to Power BI + """ + + warehouse_id: VariableOrOptional[str] + """ + The SQL warehouse ID to use as the Power BI data source + """ + + +PowerBiTaskParam = PowerBiTaskDict | PowerBiTask diff --git a/experimental/python/databricks/bundles/jobs/_models/storage_mode.py b/experimental/python/databricks/bundles/jobs/_models/storage_mode.py new file mode 100644 index 0000000000..7734af69c1 --- /dev/null +++ b/experimental/python/databricks/bundles/jobs/_models/storage_mode.py @@ -0,0 +1,15 @@ +from enum import Enum +from typing import Literal + + +class StorageMode(Enum): + """ + :meta private: [EXPERIMENTAL] + """ + + DIRECT_QUERY = "DIRECT_QUERY" + IMPORT = "IMPORT" + DUAL = "DUAL" + + +StorageModeParam = Literal["DIRECT_QUERY", "IMPORT", "DUAL"] | StorageMode diff --git a/experimental/python/databricks/bundles/jobs/_models/subscription.py b/experimental/python/databricks/bundles/jobs/_models/subscription.py new file mode 100644 index 0000000000..2d63990245 --- /dev/null +++ b/experimental/python/databricks/bundles/jobs/_models/subscription.py @@ -0,0 +1,58 @@ +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, TypedDict + +from databricks.bundles.core._transform import _transform +from databricks.bundles.core._transform_to_json import _transform_to_json_value +from databricks.bundles.core._variable import VariableOrList, VariableOrOptional +from databricks.bundles.jobs._models.subscription_subscriber import ( + SubscriptionSubscriber, + SubscriptionSubscriberParam, +) + +if TYPE_CHECKING: + from typing_extensions import Self + + +@dataclass(kw_only=True) +class Subscription: + """""" + + custom_subject: VariableOrOptional[str] = None + """ + Optional: Allows users to specify a custom subject line on the email sent + to subscribers. + """ + + paused: VariableOrOptional[bool] = None + """ + When true, the subscription will not send emails. + """ + + subscribers: VariableOrList[SubscriptionSubscriber] = field(default_factory=list) + + @classmethod + def from_dict(cls, value: "SubscriptionDict") -> "Self": + return _transform(cls, value) + + def as_dict(self) -> "SubscriptionDict": + return _transform_to_json_value(self) # type:ignore + + +class SubscriptionDict(TypedDict, total=False): + """""" + + custom_subject: VariableOrOptional[str] + """ + Optional: Allows users to specify a custom subject line on the email sent + to subscribers. + """ + + paused: VariableOrOptional[bool] + """ + When true, the subscription will not send emails. + """ + + subscribers: VariableOrList[SubscriptionSubscriberParam] + + +SubscriptionParam = SubscriptionDict | Subscription diff --git a/experimental/python/databricks/bundles/jobs/_models/subscription_subscriber.py b/experimental/python/databricks/bundles/jobs/_models/subscription_subscriber.py new file mode 100644 index 0000000000..eddc8fc7aa --- /dev/null +++ b/experimental/python/databricks/bundles/jobs/_models/subscription_subscriber.py @@ -0,0 +1,36 @@ +from dataclasses import dataclass +from typing import TYPE_CHECKING, TypedDict + +from databricks.bundles.core._transform import _transform +from databricks.bundles.core._transform_to_json import _transform_to_json_value +from databricks.bundles.core._variable import VariableOrOptional + +if TYPE_CHECKING: + from typing_extensions import Self + + +@dataclass(kw_only=True) +class SubscriptionSubscriber: + """""" + + destination_id: VariableOrOptional[str] = None + + user_name: VariableOrOptional[str] = None + + @classmethod + def from_dict(cls, value: "SubscriptionSubscriberDict") -> "Self": + return _transform(cls, value) + + def as_dict(self) -> "SubscriptionSubscriberDict": + return _transform_to_json_value(self) # type:ignore + + +class SubscriptionSubscriberDict(TypedDict, total=False): + """""" + + destination_id: VariableOrOptional[str] + + user_name: VariableOrOptional[str] + + +SubscriptionSubscriberParam = SubscriptionSubscriberDict | SubscriptionSubscriber diff --git a/experimental/python/databricks/bundles/jobs/_models/task.py b/experimental/python/databricks/bundles/jobs/_models/task.py index 65edc7f76f..51c9797470 100644 --- a/experimental/python/databricks/bundles/jobs/_models/task.py +++ b/experimental/python/databricks/bundles/jobs/_models/task.py @@ -24,6 +24,10 @@ ConditionTask, ConditionTaskParam, ) +from databricks.bundles.jobs._models.dashboard_task import ( + DashboardTask, + DashboardTaskParam, +) from databricks.bundles.jobs._models.dbt_task import DbtTask, DbtTaskParam from databricks.bundles.jobs._models.for_each_task import ( ForEachTask, @@ -45,6 +49,7 @@ PipelineTask, PipelineTaskParam, ) +from databricks.bundles.jobs._models.power_bi_task import PowerBiTask, PowerBiTaskParam from databricks.bundles.jobs._models.python_wheel_task import ( PythonWheelTask, PythonWheelTaskParam, @@ -111,6 +116,11 @@ class Task: The condition task does not require a cluster to execute and does not support retries or notifications. """ + dashboard_task: VariableOrOptional[DashboardTask] = None + """ + The task refreshes a dashboard and sends a snapshot to subscribers. + """ + dbt_task: VariableOrOptional[DbtTask] = None """ The task runs one or more dbt commands when the `dbt_task` field is present. The dbt task requires both Databricks SQL and the ability to use a serverless or a pro SQL warehouse. @@ -203,6 +213,13 @@ class Task: The task triggers a pipeline update when the `pipeline_task` field is present. Only pipelines configured to use triggered more are supported. """ + power_bi_task: VariableOrOptional[PowerBiTask] = None + """ + :meta private: [EXPERIMENTAL] + + The task triggers a Power BI semantic model update when the `power_bi_task` field is present. + """ + python_wheel_task: VariableOrOptional[PythonWheelTask] = None """ The task runs a Python wheel when the `python_wheel_task` field is present. @@ -299,6 +316,11 @@ class TaskDict(TypedDict, total=False): The condition task does not require a cluster to execute and does not support retries or notifications. """ + dashboard_task: VariableOrOptional[DashboardTaskParam] + """ + The task refreshes a dashboard and sends a snapshot to subscribers. + """ + dbt_task: VariableOrOptional[DbtTaskParam] """ The task runs one or more dbt commands when the `dbt_task` field is present. The dbt task requires both Databricks SQL and the ability to use a serverless or a pro SQL warehouse. @@ -391,6 +413,13 @@ class TaskDict(TypedDict, total=False): The task triggers a pipeline update when the `pipeline_task` field is present. Only pipelines configured to use triggered more are supported. """ + power_bi_task: VariableOrOptional[PowerBiTaskParam] + """ + :meta private: [EXPERIMENTAL] + + The task triggers a Power BI semantic model update when the `power_bi_task` field is present. + """ + python_wheel_task: VariableOrOptional[PythonWheelTaskParam] """ The task runs a Python wheel when the `python_wheel_task` field is present.