From 2fb95ac65574c46ef7878b1f10c3f7b86c1374ed Mon Sep 17 00:00:00 2001 From: Gleb Kanterov Date: Thu, 3 Apr 2025 11:51:10 +0200 Subject: [PATCH 1/4] Add pipelines --- .../databricks/bundles/pipelines/__init__.py | 307 +++++++++++++++- .../bundles/pipelines/_models/cron_trigger.py | 36 ++ .../bundles/pipelines/_models/day_of_week.py | 27 ++ .../pipelines/_models/deployment_kind.py | 15 + .../pipelines/_models/event_log_spec.py | 60 ++++ .../bundles/pipelines/_models/file_library.py | 38 ++ .../bundles/pipelines/_models/filters.py | 48 +++ .../pipelines/_models/ingestion_config.py | 61 ++++ .../ingestion_gateway_pipeline_definition.py | 78 ++++ .../_models/ingestion_pipeline_definition.py | 78 ++++ .../pipelines/_models/manual_trigger.py | 27 ++ .../pipelines/_models/notebook_library.py | 38 ++ .../pipelines/_models/notifications.py | 64 ++++ .../bundles/pipelines/_models/pipeline.py | 290 ++++++++++++++- .../pipelines/_models/pipeline_cluster.py | 332 ++++++++++++++++++ .../_models/pipeline_cluster_autoscale.py | 74 ++++ .../pipeline_cluster_autoscale_mode.py | 21 ++ .../pipelines/_models/pipeline_deployment.py | 52 +++ .../pipelines/_models/pipeline_library.py | 96 +++++ .../pipelines/_models/pipeline_permission.py | 60 ++++ .../_models/pipeline_permission_level.py | 14 + .../pipelines/_models/pipeline_trigger.py | 44 +++ .../bundles/pipelines/_models/report_spec.py | 82 +++++ .../pipelines/_models/restart_window.py | 71 ++++ .../bundles/pipelines/_models/run_as.py | 54 +++ .../bundles/pipelines/_models/schema_spec.py | 82 +++++ .../bundles/pipelines/_models/table_spec.py | 102 ++++++ .../_models/table_specific_config.py | 80 +++++ .../_models/table_specific_config_scd_type.py | 18 + 29 files changed, 2340 insertions(+), 9 deletions(-) create mode 100644 experimental/python/databricks/bundles/pipelines/_models/cron_trigger.py create mode 100644 experimental/python/databricks/bundles/pipelines/_models/day_of_week.py create mode 100644 experimental/python/databricks/bundles/pipelines/_models/deployment_kind.py create mode 100644 experimental/python/databricks/bundles/pipelines/_models/event_log_spec.py create mode 100644 experimental/python/databricks/bundles/pipelines/_models/file_library.py create mode 100644 experimental/python/databricks/bundles/pipelines/_models/filters.py create mode 100644 experimental/python/databricks/bundles/pipelines/_models/ingestion_config.py create mode 100644 experimental/python/databricks/bundles/pipelines/_models/ingestion_gateway_pipeline_definition.py create mode 100644 experimental/python/databricks/bundles/pipelines/_models/ingestion_pipeline_definition.py create mode 100644 experimental/python/databricks/bundles/pipelines/_models/manual_trigger.py create mode 100644 experimental/python/databricks/bundles/pipelines/_models/notebook_library.py create mode 100644 experimental/python/databricks/bundles/pipelines/_models/notifications.py create mode 100644 experimental/python/databricks/bundles/pipelines/_models/pipeline_cluster.py create mode 100644 experimental/python/databricks/bundles/pipelines/_models/pipeline_cluster_autoscale.py create mode 100644 experimental/python/databricks/bundles/pipelines/_models/pipeline_cluster_autoscale_mode.py create mode 100644 experimental/python/databricks/bundles/pipelines/_models/pipeline_deployment.py create mode 100644 experimental/python/databricks/bundles/pipelines/_models/pipeline_library.py create mode 100644 experimental/python/databricks/bundles/pipelines/_models/pipeline_permission.py create mode 100644 experimental/python/databricks/bundles/pipelines/_models/pipeline_permission_level.py create mode 100644 experimental/python/databricks/bundles/pipelines/_models/pipeline_trigger.py create mode 100644 experimental/python/databricks/bundles/pipelines/_models/report_spec.py create mode 100644 experimental/python/databricks/bundles/pipelines/_models/restart_window.py create mode 100644 experimental/python/databricks/bundles/pipelines/_models/run_as.py create mode 100644 experimental/python/databricks/bundles/pipelines/_models/schema_spec.py create mode 100644 experimental/python/databricks/bundles/pipelines/_models/table_spec.py create mode 100644 experimental/python/databricks/bundles/pipelines/_models/table_specific_config.py create mode 100644 experimental/python/databricks/bundles/pipelines/_models/table_specific_config_scd_type.py diff --git a/experimental/python/databricks/bundles/pipelines/__init__.py b/experimental/python/databricks/bundles/pipelines/__init__.py index a5da315c35..73c884493f 100644 --- a/experimental/python/databricks/bundles/pipelines/__init__.py +++ b/experimental/python/databricks/bundles/pipelines/__init__.py @@ -1,6 +1,309 @@ -from databricks.bundles.pipelines._models.pipeline import Pipeline, PipelineParam - __all__ = [ + "Adlsgen2Info", + "Adlsgen2InfoDict", + "Adlsgen2InfoParam", + "AwsAttributes", + "AwsAttributesDict", + "AwsAttributesParam", + "AwsAvailability", + "AwsAvailabilityParam", + "AzureAttributes", + "AzureAttributesDict", + "AzureAttributesParam", + "AzureAvailability", + "AzureAvailabilityParam", + "ClusterLogConf", + "ClusterLogConfDict", + "ClusterLogConfParam", + "DayOfWeek", + "DayOfWeekParam", + "DbfsStorageInfo", + "DbfsStorageInfoDict", + "DbfsStorageInfoParam", + "EbsVolumeType", + "EbsVolumeTypeParam", + "EventLogSpec", + "EventLogSpecDict", + "EventLogSpecParam", + "FileLibrary", + "FileLibraryDict", + "FileLibraryParam", + "Filters", + "FiltersDict", + "FiltersParam", + "GcpAttributes", + "GcpAttributesDict", + "GcpAttributesParam", + "GcpAvailability", + "GcpAvailabilityParam", + "GcsStorageInfo", + "GcsStorageInfoDict", + "GcsStorageInfoParam", + "IngestionConfig", + "IngestionConfigDict", + "IngestionConfigParam", + "IngestionGatewayPipelineDefinition", + "IngestionGatewayPipelineDefinitionDict", + "IngestionGatewayPipelineDefinitionParam", + "IngestionPipelineDefinition", + "IngestionPipelineDefinitionDict", + "IngestionPipelineDefinitionParam", + "InitScriptInfo", + "InitScriptInfoDict", + "InitScriptInfoParam", + "LocalFileInfo", + "LocalFileInfoDict", + "LocalFileInfoParam", + "LogAnalyticsInfo", + "LogAnalyticsInfoDict", + "LogAnalyticsInfoParam", + "MavenLibrary", + "MavenLibraryDict", + "MavenLibraryParam", + "NotebookLibrary", + "NotebookLibraryDict", + "NotebookLibraryParam", + "Notifications", + "NotificationsDict", + "NotificationsParam", "Pipeline", + "PipelineCluster", + "PipelineClusterAutoscale", + "PipelineClusterAutoscaleDict", + "PipelineClusterAutoscaleMode", + "PipelineClusterAutoscaleModeParam", + "PipelineClusterAutoscaleParam", + "PipelineClusterDict", + "PipelineClusterParam", + "PipelineDict", + "PipelineLibrary", + "PipelineLibraryDict", + "PipelineLibraryParam", "PipelineParam", + "PipelinePermission", + "PipelinePermissionDict", + "PipelinePermissionLevel", + "PipelinePermissionLevelParam", + "PipelinePermissionParam", + "ReportSpec", + "ReportSpecDict", + "ReportSpecParam", + "RestartWindow", + "RestartWindowDict", + "RestartWindowParam", + "RunAs", + "RunAsDict", + "RunAsParam", + "S3StorageInfo", + "S3StorageInfoDict", + "S3StorageInfoParam", + "SchemaSpec", + "SchemaSpecDict", + "SchemaSpecParam", + "TableSpec", + "TableSpecDict", + "TableSpecParam", + "TableSpecificConfig", + "TableSpecificConfigDict", + "TableSpecificConfigParam", + "TableSpecificConfigScdType", + "TableSpecificConfigScdTypeParam", + "VolumesStorageInfo", + "VolumesStorageInfoDict", + "VolumesStorageInfoParam", + "WorkspaceStorageInfo", + "WorkspaceStorageInfoDict", + "WorkspaceStorageInfoParam", ] + + +from databricks.bundles.compute._models.adlsgen2_info import ( + Adlsgen2Info, + Adlsgen2InfoDict, + Adlsgen2InfoParam, +) +from databricks.bundles.compute._models.aws_attributes import ( + AwsAttributes, + AwsAttributesDict, + AwsAttributesParam, +) +from databricks.bundles.compute._models.aws_availability import ( + AwsAvailability, + AwsAvailabilityParam, +) +from databricks.bundles.compute._models.azure_attributes import ( + AzureAttributes, + AzureAttributesDict, + AzureAttributesParam, +) +from databricks.bundles.compute._models.azure_availability import ( + AzureAvailability, + AzureAvailabilityParam, +) +from databricks.bundles.compute._models.cluster_log_conf import ( + ClusterLogConf, + ClusterLogConfDict, + ClusterLogConfParam, +) +from databricks.bundles.compute._models.dbfs_storage_info import ( + DbfsStorageInfo, + DbfsStorageInfoDict, + DbfsStorageInfoParam, +) +from databricks.bundles.compute._models.ebs_volume_type import ( + EbsVolumeType, + EbsVolumeTypeParam, +) +from databricks.bundles.compute._models.gcp_attributes import ( + GcpAttributes, + GcpAttributesDict, + GcpAttributesParam, +) +from databricks.bundles.compute._models.gcp_availability import ( + GcpAvailability, + GcpAvailabilityParam, +) +from databricks.bundles.compute._models.gcs_storage_info import ( + GcsStorageInfo, + GcsStorageInfoDict, + GcsStorageInfoParam, +) +from databricks.bundles.compute._models.init_script_info import ( + InitScriptInfo, + InitScriptInfoDict, + InitScriptInfoParam, +) +from databricks.bundles.compute._models.local_file_info import ( + LocalFileInfo, + LocalFileInfoDict, + LocalFileInfoParam, +) +from databricks.bundles.compute._models.log_analytics_info import ( + LogAnalyticsInfo, + LogAnalyticsInfoDict, + LogAnalyticsInfoParam, +) +from databricks.bundles.compute._models.maven_library import ( + MavenLibrary, + MavenLibraryDict, + MavenLibraryParam, +) +from databricks.bundles.compute._models.s3_storage_info import ( + S3StorageInfo, + S3StorageInfoDict, + S3StorageInfoParam, +) +from databricks.bundles.compute._models.volumes_storage_info import ( + VolumesStorageInfo, + VolumesStorageInfoDict, + VolumesStorageInfoParam, +) +from databricks.bundles.compute._models.workspace_storage_info import ( + WorkspaceStorageInfo, + WorkspaceStorageInfoDict, + WorkspaceStorageInfoParam, +) +from databricks.bundles.pipelines._models.day_of_week import DayOfWeek, DayOfWeekParam +from databricks.bundles.pipelines._models.event_log_spec import ( + EventLogSpec, + EventLogSpecDict, + EventLogSpecParam, +) +from databricks.bundles.pipelines._models.file_library import ( + FileLibrary, + FileLibraryDict, + FileLibraryParam, +) +from databricks.bundles.pipelines._models.filters import ( + Filters, + FiltersDict, + FiltersParam, +) +from databricks.bundles.pipelines._models.ingestion_config import ( + IngestionConfig, + IngestionConfigDict, + IngestionConfigParam, +) +from databricks.bundles.pipelines._models.ingestion_gateway_pipeline_definition import ( + IngestionGatewayPipelineDefinition, + IngestionGatewayPipelineDefinitionDict, + IngestionGatewayPipelineDefinitionParam, +) +from databricks.bundles.pipelines._models.ingestion_pipeline_definition import ( + IngestionPipelineDefinition, + IngestionPipelineDefinitionDict, + IngestionPipelineDefinitionParam, +) +from databricks.bundles.pipelines._models.notebook_library import ( + NotebookLibrary, + NotebookLibraryDict, + NotebookLibraryParam, +) +from databricks.bundles.pipelines._models.notifications import ( + Notifications, + NotificationsDict, + NotificationsParam, +) +from databricks.bundles.pipelines._models.pipeline import ( + Pipeline, + PipelineDict, + PipelineParam, +) +from databricks.bundles.pipelines._models.pipeline_cluster import ( + PipelineCluster, + PipelineClusterDict, + PipelineClusterParam, +) +from databricks.bundles.pipelines._models.pipeline_cluster_autoscale import ( + PipelineClusterAutoscale, + PipelineClusterAutoscaleDict, + PipelineClusterAutoscaleParam, +) +from databricks.bundles.pipelines._models.pipeline_cluster_autoscale_mode import ( + PipelineClusterAutoscaleMode, + PipelineClusterAutoscaleModeParam, +) +from databricks.bundles.pipelines._models.pipeline_library import ( + PipelineLibrary, + PipelineLibraryDict, + PipelineLibraryParam, +) +from databricks.bundles.pipelines._models.pipeline_permission import ( + PipelinePermission, + PipelinePermissionDict, + PipelinePermissionParam, +) +from databricks.bundles.pipelines._models.pipeline_permission_level import ( + PipelinePermissionLevel, + PipelinePermissionLevelParam, +) +from databricks.bundles.pipelines._models.report_spec import ( + ReportSpec, + ReportSpecDict, + ReportSpecParam, +) +from databricks.bundles.pipelines._models.restart_window import ( + RestartWindow, + RestartWindowDict, + RestartWindowParam, +) +from databricks.bundles.pipelines._models.run_as import RunAs, RunAsDict, RunAsParam +from databricks.bundles.pipelines._models.schema_spec import ( + SchemaSpec, + SchemaSpecDict, + SchemaSpecParam, +) +from databricks.bundles.pipelines._models.table_spec import ( + TableSpec, + TableSpecDict, + TableSpecParam, +) +from databricks.bundles.pipelines._models.table_specific_config import ( + TableSpecificConfig, + TableSpecificConfigDict, + TableSpecificConfigParam, +) +from databricks.bundles.pipelines._models.table_specific_config_scd_type import ( + TableSpecificConfigScdType, + TableSpecificConfigScdTypeParam, +) diff --git a/experimental/python/databricks/bundles/pipelines/_models/cron_trigger.py b/experimental/python/databricks/bundles/pipelines/_models/cron_trigger.py new file mode 100644 index 0000000000..ab67a42658 --- /dev/null +++ b/experimental/python/databricks/bundles/pipelines/_models/cron_trigger.py @@ -0,0 +1,36 @@ +from dataclasses import dataclass +from typing import TYPE_CHECKING, TypedDict + +from databricks.bundles.core._transform import _transform +from databricks.bundles.core._transform_to_json import _transform_to_json_value +from databricks.bundles.core._variable import VariableOrOptional + +if TYPE_CHECKING: + from typing_extensions import Self + + +@dataclass(kw_only=True) +class CronTrigger: + """""" + + quartz_cron_schedule: VariableOrOptional[str] = None + + timezone_id: VariableOrOptional[str] = None + + @classmethod + def from_dict(cls, value: "CronTriggerDict") -> "Self": + return _transform(cls, value) + + def as_dict(self) -> "CronTriggerDict": + return _transform_to_json_value(self) # type:ignore + + +class CronTriggerDict(TypedDict, total=False): + """""" + + quartz_cron_schedule: VariableOrOptional[str] + + timezone_id: VariableOrOptional[str] + + +CronTriggerParam = CronTriggerDict | CronTrigger diff --git a/experimental/python/databricks/bundles/pipelines/_models/day_of_week.py b/experimental/python/databricks/bundles/pipelines/_models/day_of_week.py new file mode 100644 index 0000000000..eaf5cbc9ff --- /dev/null +++ b/experimental/python/databricks/bundles/pipelines/_models/day_of_week.py @@ -0,0 +1,27 @@ +from enum import Enum +from typing import Literal + + +class DayOfWeek(Enum): + """ + :meta private: [EXPERIMENTAL] + + Days of week in which the restart is allowed to happen (within a five-hour window starting at start_hour). + If not specified all days of the week will be used. + """ + + MONDAY = "MONDAY" + TUESDAY = "TUESDAY" + WEDNESDAY = "WEDNESDAY" + THURSDAY = "THURSDAY" + FRIDAY = "FRIDAY" + SATURDAY = "SATURDAY" + SUNDAY = "SUNDAY" + + +DayOfWeekParam = ( + Literal[ + "MONDAY", "TUESDAY", "WEDNESDAY", "THURSDAY", "FRIDAY", "SATURDAY", "SUNDAY" + ] + | DayOfWeek +) diff --git a/experimental/python/databricks/bundles/pipelines/_models/deployment_kind.py b/experimental/python/databricks/bundles/pipelines/_models/deployment_kind.py new file mode 100644 index 0000000000..800382ef18 --- /dev/null +++ b/experimental/python/databricks/bundles/pipelines/_models/deployment_kind.py @@ -0,0 +1,15 @@ +from enum import Enum +from typing import Literal + + +class DeploymentKind(Enum): + """ + The deployment method that manages the pipeline: + - BUNDLE: The pipeline is managed by a Databricks Asset Bundle. + + """ + + BUNDLE = "BUNDLE" + + +DeploymentKindParam = Literal["BUNDLE"] | DeploymentKind diff --git a/experimental/python/databricks/bundles/pipelines/_models/event_log_spec.py b/experimental/python/databricks/bundles/pipelines/_models/event_log_spec.py new file mode 100644 index 0000000000..9b7f5dcdcc --- /dev/null +++ b/experimental/python/databricks/bundles/pipelines/_models/event_log_spec.py @@ -0,0 +1,60 @@ +from dataclasses import dataclass +from typing import TYPE_CHECKING, TypedDict + +from databricks.bundles.core._transform import _transform +from databricks.bundles.core._transform_to_json import _transform_to_json_value +from databricks.bundles.core._variable import VariableOrOptional + +if TYPE_CHECKING: + from typing_extensions import Self + + +@dataclass(kw_only=True) +class EventLogSpec: + """ + Configurable event log parameters. + """ + + catalog: VariableOrOptional[str] = None + """ + The UC catalog the event log is published under. + """ + + name: VariableOrOptional[str] = None + """ + The name the event log is published to in UC. + """ + + schema: VariableOrOptional[str] = None + """ + The UC schema the event log is published under. + """ + + @classmethod + def from_dict(cls, value: "EventLogSpecDict") -> "Self": + return _transform(cls, value) + + def as_dict(self) -> "EventLogSpecDict": + return _transform_to_json_value(self) # type:ignore + + +class EventLogSpecDict(TypedDict, total=False): + """""" + + catalog: VariableOrOptional[str] + """ + The UC catalog the event log is published under. + """ + + name: VariableOrOptional[str] + """ + The name the event log is published to in UC. + """ + + schema: VariableOrOptional[str] + """ + The UC schema the event log is published under. + """ + + +EventLogSpecParam = EventLogSpecDict | EventLogSpec diff --git a/experimental/python/databricks/bundles/pipelines/_models/file_library.py b/experimental/python/databricks/bundles/pipelines/_models/file_library.py new file mode 100644 index 0000000000..9f57a72cb9 --- /dev/null +++ b/experimental/python/databricks/bundles/pipelines/_models/file_library.py @@ -0,0 +1,38 @@ +from dataclasses import dataclass +from typing import TYPE_CHECKING, TypedDict + +from databricks.bundles.core._transform import _transform +from databricks.bundles.core._transform_to_json import _transform_to_json_value +from databricks.bundles.core._variable import VariableOrOptional + +if TYPE_CHECKING: + from typing_extensions import Self + + +@dataclass(kw_only=True) +class FileLibrary: + """""" + + path: VariableOrOptional[str] = None + """ + The absolute path of the file. + """ + + @classmethod + def from_dict(cls, value: "FileLibraryDict") -> "Self": + return _transform(cls, value) + + def as_dict(self) -> "FileLibraryDict": + return _transform_to_json_value(self) # type:ignore + + +class FileLibraryDict(TypedDict, total=False): + """""" + + path: VariableOrOptional[str] + """ + The absolute path of the file. + """ + + +FileLibraryParam = FileLibraryDict | FileLibrary diff --git a/experimental/python/databricks/bundles/pipelines/_models/filters.py b/experimental/python/databricks/bundles/pipelines/_models/filters.py new file mode 100644 index 0000000000..13f29eeb49 --- /dev/null +++ b/experimental/python/databricks/bundles/pipelines/_models/filters.py @@ -0,0 +1,48 @@ +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, TypedDict + +from databricks.bundles.core._transform import _transform +from databricks.bundles.core._transform_to_json import _transform_to_json_value +from databricks.bundles.core._variable import VariableOrList + +if TYPE_CHECKING: + from typing_extensions import Self + + +@dataclass(kw_only=True) +class Filters: + """""" + + exclude: VariableOrList[str] = field(default_factory=list) + """ + Paths to exclude. + """ + + include: VariableOrList[str] = field(default_factory=list) + """ + Paths to include. + """ + + @classmethod + def from_dict(cls, value: "FiltersDict") -> "Self": + return _transform(cls, value) + + def as_dict(self) -> "FiltersDict": + return _transform_to_json_value(self) # type:ignore + + +class FiltersDict(TypedDict, total=False): + """""" + + exclude: VariableOrList[str] + """ + Paths to exclude. + """ + + include: VariableOrList[str] + """ + Paths to include. + """ + + +FiltersParam = FiltersDict | Filters diff --git a/experimental/python/databricks/bundles/pipelines/_models/ingestion_config.py b/experimental/python/databricks/bundles/pipelines/_models/ingestion_config.py new file mode 100644 index 0000000000..c452222df9 --- /dev/null +++ b/experimental/python/databricks/bundles/pipelines/_models/ingestion_config.py @@ -0,0 +1,61 @@ +from dataclasses import dataclass +from typing import TYPE_CHECKING, TypedDict + +from databricks.bundles.core._transform import _transform +from databricks.bundles.core._transform_to_json import _transform_to_json_value +from databricks.bundles.core._variable import VariableOrOptional +from databricks.bundles.pipelines._models.report_spec import ReportSpec, ReportSpecParam +from databricks.bundles.pipelines._models.schema_spec import SchemaSpec, SchemaSpecParam +from databricks.bundles.pipelines._models.table_spec import TableSpec, TableSpecParam + +if TYPE_CHECKING: + from typing_extensions import Self + + +@dataclass(kw_only=True) +class IngestionConfig: + """""" + + report: VariableOrOptional[ReportSpec] = None + """ + Select a specific source report. + """ + + schema: VariableOrOptional[SchemaSpec] = None + """ + Select all tables from a specific source schema. + """ + + table: VariableOrOptional[TableSpec] = None + """ + Select a specific source table. + """ + + @classmethod + def from_dict(cls, value: "IngestionConfigDict") -> "Self": + return _transform(cls, value) + + def as_dict(self) -> "IngestionConfigDict": + return _transform_to_json_value(self) # type:ignore + + +class IngestionConfigDict(TypedDict, total=False): + """""" + + report: VariableOrOptional[ReportSpecParam] + """ + Select a specific source report. + """ + + schema: VariableOrOptional[SchemaSpecParam] + """ + Select all tables from a specific source schema. + """ + + table: VariableOrOptional[TableSpecParam] + """ + Select a specific source table. + """ + + +IngestionConfigParam = IngestionConfigDict | IngestionConfig diff --git a/experimental/python/databricks/bundles/pipelines/_models/ingestion_gateway_pipeline_definition.py b/experimental/python/databricks/bundles/pipelines/_models/ingestion_gateway_pipeline_definition.py new file mode 100644 index 0000000000..d490a0bef2 --- /dev/null +++ b/experimental/python/databricks/bundles/pipelines/_models/ingestion_gateway_pipeline_definition.py @@ -0,0 +1,78 @@ +from dataclasses import dataclass +from typing import TYPE_CHECKING, TypedDict + +from databricks.bundles.core._transform import _transform +from databricks.bundles.core._transform_to_json import _transform_to_json_value +from databricks.bundles.core._variable import VariableOrOptional + +if TYPE_CHECKING: + from typing_extensions import Self + + +@dataclass(kw_only=True) +class IngestionGatewayPipelineDefinition: + """ + :meta private: [EXPERIMENTAL] + """ + + connection_name: VariableOrOptional[str] = None + """ + Immutable. The Unity Catalog connection that this gateway pipeline uses to communicate with the source. + """ + + gateway_storage_catalog: VariableOrOptional[str] = None + """ + Required, Immutable. The name of the catalog for the gateway pipeline's storage location. + """ + + gateway_storage_name: VariableOrOptional[str] = None + """ + Optional. The Unity Catalog-compatible name for the gateway storage location. + This is the destination to use for the data that is extracted by the gateway. + Delta Live Tables system will automatically create the storage location under the catalog and schema. + + """ + + gateway_storage_schema: VariableOrOptional[str] = None + """ + Required, Immutable. The name of the schema for the gateway pipelines's storage location. + """ + + @classmethod + def from_dict(cls, value: "IngestionGatewayPipelineDefinitionDict") -> "Self": + return _transform(cls, value) + + def as_dict(self) -> "IngestionGatewayPipelineDefinitionDict": + return _transform_to_json_value(self) # type:ignore + + +class IngestionGatewayPipelineDefinitionDict(TypedDict, total=False): + """""" + + connection_name: VariableOrOptional[str] + """ + Immutable. The Unity Catalog connection that this gateway pipeline uses to communicate with the source. + """ + + gateway_storage_catalog: VariableOrOptional[str] + """ + Required, Immutable. The name of the catalog for the gateway pipeline's storage location. + """ + + gateway_storage_name: VariableOrOptional[str] + """ + Optional. The Unity Catalog-compatible name for the gateway storage location. + This is the destination to use for the data that is extracted by the gateway. + Delta Live Tables system will automatically create the storage location under the catalog and schema. + + """ + + gateway_storage_schema: VariableOrOptional[str] + """ + Required, Immutable. The name of the schema for the gateway pipelines's storage location. + """ + + +IngestionGatewayPipelineDefinitionParam = ( + IngestionGatewayPipelineDefinitionDict | IngestionGatewayPipelineDefinition +) diff --git a/experimental/python/databricks/bundles/pipelines/_models/ingestion_pipeline_definition.py b/experimental/python/databricks/bundles/pipelines/_models/ingestion_pipeline_definition.py new file mode 100644 index 0000000000..cf1f451a3a --- /dev/null +++ b/experimental/python/databricks/bundles/pipelines/_models/ingestion_pipeline_definition.py @@ -0,0 +1,78 @@ +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, TypedDict + +from databricks.bundles.core._transform import _transform +from databricks.bundles.core._transform_to_json import _transform_to_json_value +from databricks.bundles.core._variable import VariableOrList, VariableOrOptional +from databricks.bundles.pipelines._models.ingestion_config import ( + IngestionConfig, + IngestionConfigParam, +) +from databricks.bundles.pipelines._models.table_specific_config import ( + TableSpecificConfig, + TableSpecificConfigParam, +) + +if TYPE_CHECKING: + from typing_extensions import Self + + +@dataclass(kw_only=True) +class IngestionPipelineDefinition: + """""" + + connection_name: VariableOrOptional[str] = None + """ + Immutable. The Unity Catalog connection that this ingestion pipeline uses to communicate with the source. This is used with connectors for applications like Salesforce, Workday, and so on. + """ + + ingestion_gateway_id: VariableOrOptional[str] = None + """ + Immutable. Identifier for the gateway that is used by this ingestion pipeline to communicate with the source database. This is used with connectors to databases like SQL Server. + """ + + objects: VariableOrList[IngestionConfig] = field(default_factory=list) + """ + Required. Settings specifying tables to replicate and the destination for the replicated tables. + """ + + table_configuration: VariableOrOptional[TableSpecificConfig] = None + """ + Configuration settings to control the ingestion of tables. These settings are applied to all tables in the pipeline. + """ + + @classmethod + def from_dict(cls, value: "IngestionPipelineDefinitionDict") -> "Self": + return _transform(cls, value) + + def as_dict(self) -> "IngestionPipelineDefinitionDict": + return _transform_to_json_value(self) # type:ignore + + +class IngestionPipelineDefinitionDict(TypedDict, total=False): + """""" + + connection_name: VariableOrOptional[str] + """ + Immutable. The Unity Catalog connection that this ingestion pipeline uses to communicate with the source. This is used with connectors for applications like Salesforce, Workday, and so on. + """ + + ingestion_gateway_id: VariableOrOptional[str] + """ + Immutable. Identifier for the gateway that is used by this ingestion pipeline to communicate with the source database. This is used with connectors to databases like SQL Server. + """ + + objects: VariableOrList[IngestionConfigParam] + """ + Required. Settings specifying tables to replicate and the destination for the replicated tables. + """ + + table_configuration: VariableOrOptional[TableSpecificConfigParam] + """ + Configuration settings to control the ingestion of tables. These settings are applied to all tables in the pipeline. + """ + + +IngestionPipelineDefinitionParam = ( + IngestionPipelineDefinitionDict | IngestionPipelineDefinition +) diff --git a/experimental/python/databricks/bundles/pipelines/_models/manual_trigger.py b/experimental/python/databricks/bundles/pipelines/_models/manual_trigger.py new file mode 100644 index 0000000000..c7403cc07d --- /dev/null +++ b/experimental/python/databricks/bundles/pipelines/_models/manual_trigger.py @@ -0,0 +1,27 @@ +from dataclasses import dataclass +from typing import TYPE_CHECKING, TypedDict + +from databricks.bundles.core._transform import _transform +from databricks.bundles.core._transform_to_json import _transform_to_json_value + +if TYPE_CHECKING: + from typing_extensions import Self + + +@dataclass(kw_only=True) +class ManualTrigger: + """""" + + @classmethod + def from_dict(cls, value: "ManualTriggerDict") -> "Self": + return _transform(cls, value) + + def as_dict(self) -> "ManualTriggerDict": + return _transform_to_json_value(self) # type:ignore + + +class ManualTriggerDict(TypedDict, total=False): + """""" + + +ManualTriggerParam = ManualTriggerDict | ManualTrigger diff --git a/experimental/python/databricks/bundles/pipelines/_models/notebook_library.py b/experimental/python/databricks/bundles/pipelines/_models/notebook_library.py new file mode 100644 index 0000000000..b8ac89205f --- /dev/null +++ b/experimental/python/databricks/bundles/pipelines/_models/notebook_library.py @@ -0,0 +1,38 @@ +from dataclasses import dataclass +from typing import TYPE_CHECKING, TypedDict + +from databricks.bundles.core._transform import _transform +from databricks.bundles.core._transform_to_json import _transform_to_json_value +from databricks.bundles.core._variable import VariableOrOptional + +if TYPE_CHECKING: + from typing_extensions import Self + + +@dataclass(kw_only=True) +class NotebookLibrary: + """""" + + path: VariableOrOptional[str] = None + """ + The absolute path of the notebook. + """ + + @classmethod + def from_dict(cls, value: "NotebookLibraryDict") -> "Self": + return _transform(cls, value) + + def as_dict(self) -> "NotebookLibraryDict": + return _transform_to_json_value(self) # type:ignore + + +class NotebookLibraryDict(TypedDict, total=False): + """""" + + path: VariableOrOptional[str] + """ + The absolute path of the notebook. + """ + + +NotebookLibraryParam = NotebookLibraryDict | NotebookLibrary diff --git a/experimental/python/databricks/bundles/pipelines/_models/notifications.py b/experimental/python/databricks/bundles/pipelines/_models/notifications.py new file mode 100644 index 0000000000..06ecfb8dbe --- /dev/null +++ b/experimental/python/databricks/bundles/pipelines/_models/notifications.py @@ -0,0 +1,64 @@ +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, TypedDict + +from databricks.bundles.core._transform import _transform +from databricks.bundles.core._transform_to_json import _transform_to_json_value +from databricks.bundles.core._variable import VariableOrList + +if TYPE_CHECKING: + from typing_extensions import Self + + +@dataclass(kw_only=True) +class Notifications: + """""" + + alerts: VariableOrList[str] = field(default_factory=list) + """ + A list of alerts that trigger the sending of notifications to the configured + destinations. The supported alerts are: + + * `on-update-success`: A pipeline update completes successfully. + * `on-update-failure`: Each time a pipeline update fails. + * `on-update-fatal-failure`: A pipeline update fails with a non-retryable (fatal) error. + * `on-flow-failure`: A single data flow fails. + + """ + + email_recipients: VariableOrList[str] = field(default_factory=list) + """ + A list of email addresses notified when a configured alert is triggered. + + """ + + @classmethod + def from_dict(cls, value: "NotificationsDict") -> "Self": + return _transform(cls, value) + + def as_dict(self) -> "NotificationsDict": + return _transform_to_json_value(self) # type:ignore + + +class NotificationsDict(TypedDict, total=False): + """""" + + alerts: VariableOrList[str] + """ + A list of alerts that trigger the sending of notifications to the configured + destinations. The supported alerts are: + + * `on-update-success`: A pipeline update completes successfully. + * `on-update-failure`: Each time a pipeline update fails. + * `on-update-fatal-failure`: A pipeline update fails with a non-retryable (fatal) error. + * `on-flow-failure`: A single data flow fails. + + """ + + email_recipients: VariableOrList[str] + """ + A list of email addresses notified when a configured alert is triggered. + + """ + + +NotificationsParam = NotificationsDict | Notifications diff --git a/experimental/python/databricks/bundles/pipelines/_models/pipeline.py b/experimental/python/databricks/bundles/pipelines/_models/pipeline.py index e3c944e3fa..96c5858403 100644 --- a/experimental/python/databricks/bundles/pipelines/_models/pipeline.py +++ b/experimental/python/databricks/bundles/pipelines/_models/pipeline.py @@ -1,24 +1,177 @@ from dataclasses import dataclass, field -from typing import TYPE_CHECKING, Any, TypedDict +from typing import TYPE_CHECKING, TypedDict -from databricks.bundles.core import Resource, VariableOrList, VariableOrOptional +from databricks.bundles.core._resource import Resource from databricks.bundles.core._transform import _transform from databricks.bundles.core._transform_to_json import _transform_to_json_value +from databricks.bundles.core._variable import ( + VariableOrDict, + VariableOrList, + VariableOrOptional, +) +from databricks.bundles.pipelines._models.event_log_spec import ( + EventLogSpec, + EventLogSpecParam, +) +from databricks.bundles.pipelines._models.filters import ( + Filters, + FiltersParam, +) +from databricks.bundles.pipelines._models.ingestion_gateway_pipeline_definition import ( + IngestionGatewayPipelineDefinition, + IngestionGatewayPipelineDefinitionParam, +) +from databricks.bundles.pipelines._models.ingestion_pipeline_definition import ( + IngestionPipelineDefinition, + IngestionPipelineDefinitionParam, +) +from databricks.bundles.pipelines._models.notifications import ( + Notifications, + NotificationsParam, +) +from databricks.bundles.pipelines._models.pipeline_cluster import ( + PipelineCluster, + PipelineClusterParam, +) +from databricks.bundles.pipelines._models.pipeline_library import ( + PipelineLibrary, + PipelineLibraryParam, +) +from databricks.bundles.pipelines._models.pipeline_permission import ( + PipelinePermission, + PipelinePermissionParam, +) +from databricks.bundles.pipelines._models.restart_window import ( + RestartWindow, + RestartWindowParam, +) +from databricks.bundles.pipelines._models.run_as import RunAs, RunAsParam if TYPE_CHECKING: from typing_extensions import Self -# TODO generate Pipeline class from jsonschema - @dataclass(kw_only=True) class Pipeline(Resource): """""" - name: VariableOrOptional[str] + budget_policy_id: VariableOrOptional[str] = None + """ + :meta private: [EXPERIMENTAL] + + Budget policy of this pipeline. + """ + + catalog: VariableOrOptional[str] = None + """ + A catalog in Unity Catalog to publish data from this pipeline to. If `target` is specified, tables in this pipeline are published to a `target` schema inside `catalog` (for example, `catalog`.`target`.`table`). If `target` is not specified, no data is published to Unity Catalog. + """ + + channel: VariableOrOptional[str] = None + """ + DLT Release Channel that specifies which version to use. + """ + + clusters: VariableOrList[PipelineCluster] = field(default_factory=list) + """ + Cluster settings for this pipeline deployment. + """ + + configuration: VariableOrDict[str] = field(default_factory=dict) + """ + String-String configuration for this pipeline execution. + """ + + continuous: VariableOrOptional[bool] = None + """ + Whether the pipeline is continuous or triggered. This replaces `trigger`. + """ + + development: VariableOrOptional[bool] = None + """ + Whether the pipeline is in Development mode. Defaults to false. + """ + + edition: VariableOrOptional[str] = None + """ + Pipeline product edition. + """ + + event_log: VariableOrOptional[EventLogSpec] = None + """ + Event log configuration for this pipeline + """ - # permission field is always present after normalization, add stub not to error on unknown property - permissions: VariableOrList[Any] = field(default_factory=list) + filters: VariableOrOptional[Filters] = None + """ + Filters on which Pipeline packages to include in the deployed graph. + """ + + gateway_definition: VariableOrOptional[IngestionGatewayPipelineDefinition] = None + """ + :meta private: [EXPERIMENTAL] + + The definition of a gateway pipeline to support change data capture. + """ + + id: VariableOrOptional[str] = None + """ + Unique identifier for this pipeline. + """ + + ingestion_definition: VariableOrOptional[IngestionPipelineDefinition] = None + """ + The configuration for a managed ingestion pipeline. These settings cannot be used with the 'libraries', 'schema', 'target', or 'catalog' settings. + """ + + libraries: VariableOrList[PipelineLibrary] = field(default_factory=list) + """ + Libraries or code needed by this deployment. + """ + + name: VariableOrOptional[str] = None + """ + Friendly identifier for this pipeline. + """ + + notifications: VariableOrList[Notifications] = field(default_factory=list) + """ + List of notification settings for this pipeline. + """ + + permissions: VariableOrList[PipelinePermission] = field(default_factory=list) + + photon: VariableOrOptional[bool] = None + """ + Whether Photon is enabled for this pipeline. + """ + + restart_window: VariableOrOptional[RestartWindow] = None + """ + :meta private: [EXPERIMENTAL] + + Restart window of this pipeline. + """ + + run_as: VariableOrOptional[RunAs] = None + """ + :meta private: [EXPERIMENTAL] + """ + + schema: VariableOrOptional[str] = None + """ + The default schema (database) where tables are read from or published to. + """ + + serverless: VariableOrOptional[bool] = None + """ + Whether serverless compute is enabled for this pipeline. + """ + + storage: VariableOrOptional[str] = None + """ + DBFS root directory for storing checkpoints and tables. + """ @classmethod def from_dict(cls, value: "PipelineDict") -> "Self": @@ -31,6 +184,7 @@ def as_dict(self) -> "PipelineDict": class PipelineDict(TypedDict, total=False): """""" +<<<<<<< HEAD name: VariableOrOptional[str] """ TODO @@ -38,3 +192,125 @@ class PipelineDict(TypedDict, total=False): PipelineParam = Pipeline | PipelineDict +======= + budget_policy_id: VariableOrOptional[str] + """ + :meta private: [EXPERIMENTAL] + + Budget policy of this pipeline. + """ + + catalog: VariableOrOptional[str] + """ + A catalog in Unity Catalog to publish data from this pipeline to. If `target` is specified, tables in this pipeline are published to a `target` schema inside `catalog` (for example, `catalog`.`target`.`table`). If `target` is not specified, no data is published to Unity Catalog. + """ + + channel: VariableOrOptional[str] + """ + DLT Release Channel that specifies which version to use. + """ + + clusters: VariableOrList[PipelineClusterParam] + """ + Cluster settings for this pipeline deployment. + """ + + configuration: VariableOrDict[str] + """ + String-String configuration for this pipeline execution. + """ + + continuous: VariableOrOptional[bool] + """ + Whether the pipeline is continuous or triggered. This replaces `trigger`. + """ + + development: VariableOrOptional[bool] + """ + Whether the pipeline is in Development mode. Defaults to false. + """ + + edition: VariableOrOptional[str] + """ + Pipeline product edition. + """ + + event_log: VariableOrOptional[EventLogSpecParam] + """ + Event log configuration for this pipeline + """ + + filters: VariableOrOptional[FiltersParam] + """ + Filters on which Pipeline packages to include in the deployed graph. + """ + + gateway_definition: VariableOrOptional[IngestionGatewayPipelineDefinitionParam] + """ + :meta private: [EXPERIMENTAL] + + The definition of a gateway pipeline to support change data capture. + """ + + id: VariableOrOptional[str] + """ + Unique identifier for this pipeline. + """ + + ingestion_definition: VariableOrOptional[IngestionPipelineDefinitionParam] + """ + The configuration for a managed ingestion pipeline. These settings cannot be used with the 'libraries', 'schema', 'target', or 'catalog' settings. + """ + + libraries: VariableOrList[PipelineLibraryParam] + """ + Libraries or code needed by this deployment. + """ + + name: VariableOrOptional[str] + """ + Friendly identifier for this pipeline. + """ + + notifications: VariableOrList[NotificationsParam] + """ + List of notification settings for this pipeline. + """ + + permissions: VariableOrList[PipelinePermissionParam] + + photon: VariableOrOptional[bool] + """ + Whether Photon is enabled for this pipeline. + """ + + restart_window: VariableOrOptional[RestartWindowParam] + """ + :meta private: [EXPERIMENTAL] + + Restart window of this pipeline. + """ + + run_as: VariableOrOptional[RunAsParam] + """ + :meta private: [EXPERIMENTAL] + """ + + schema: VariableOrOptional[str] + """ + The default schema (database) where tables are read from or published to. + """ + + serverless: VariableOrOptional[bool] + """ + Whether serverless compute is enabled for this pipeline. + """ + + storage: VariableOrOptional[str] + """ + DBFS root directory for storing checkpoints and tables. + """ + + +PipelineParam = PipelineDict | Pipeline +>>>>>>> 2aabe4c0 (Add pipelines) diff --git a/experimental/python/databricks/bundles/pipelines/_models/pipeline_cluster.py b/experimental/python/databricks/bundles/pipelines/_models/pipeline_cluster.py new file mode 100644 index 0000000000..3eb57baaf2 --- /dev/null +++ b/experimental/python/databricks/bundles/pipelines/_models/pipeline_cluster.py @@ -0,0 +1,332 @@ +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, TypedDict + +from databricks.bundles.compute._models.aws_attributes import ( + AwsAttributes, + AwsAttributesParam, +) +from databricks.bundles.compute._models.azure_attributes import ( + AzureAttributes, + AzureAttributesParam, +) +from databricks.bundles.compute._models.cluster_log_conf import ( + ClusterLogConf, + ClusterLogConfParam, +) +from databricks.bundles.compute._models.gcp_attributes import ( + GcpAttributes, + GcpAttributesParam, +) +from databricks.bundles.compute._models.init_script_info import ( + InitScriptInfo, + InitScriptInfoParam, +) +from databricks.bundles.core._transform import _transform +from databricks.bundles.core._transform_to_json import _transform_to_json_value +from databricks.bundles.core._variable import ( + VariableOrDict, + VariableOrList, + VariableOrOptional, +) +from databricks.bundles.pipelines._models.pipeline_cluster_autoscale import ( + PipelineClusterAutoscale, + PipelineClusterAutoscaleParam, +) + +if TYPE_CHECKING: + from typing_extensions import Self + + +@dataclass(kw_only=True) +class PipelineCluster: + """""" + + apply_policy_default_values: VariableOrOptional[bool] = None + """ + Note: This field won't be persisted. Only API users will check this field. + """ + + autoscale: VariableOrOptional[PipelineClusterAutoscale] = None + """ + Parameters needed in order to automatically scale clusters up and down based on load. + Note: autoscaling works best with DB runtime versions 3.0 or later. + """ + + aws_attributes: VariableOrOptional[AwsAttributes] = None + """ + Attributes related to clusters running on Amazon Web Services. + If not specified at cluster creation, a set of default values will be used. + """ + + azure_attributes: VariableOrOptional[AzureAttributes] = None + """ + Attributes related to clusters running on Microsoft Azure. + If not specified at cluster creation, a set of default values will be used. + """ + + cluster_log_conf: VariableOrOptional[ClusterLogConf] = None + """ + The configuration for delivering spark logs to a long-term storage destination. + Only dbfs destinations are supported. Only one destination can be specified + for one cluster. If the conf is given, the logs will be delivered to the destination every + `5 mins`. The destination of driver logs is `$destination/$clusterId/driver`, while + the destination of executor logs is `$destination/$clusterId/executor`. + + """ + + custom_tags: VariableOrDict[str] = field(default_factory=dict) + """ + Additional tags for cluster resources. Databricks will tag all cluster resources (e.g., AWS + instances and EBS volumes) with these tags in addition to `default_tags`. Notes: + + - Currently, Databricks allows at most 45 custom tags + + - Clusters can only reuse cloud resources if the resources' tags are a subset of the cluster tags + """ + + driver_instance_pool_id: VariableOrOptional[str] = None + """ + The optional ID of the instance pool for the driver of the cluster belongs. + The pool cluster uses the instance pool with id (instance_pool_id) if the driver pool is not + assigned. + """ + + driver_node_type_id: VariableOrOptional[str] = None + """ + The node type of the Spark driver. + Note that this field is optional; if unset, the driver node type will be set as the same value + as `node_type_id` defined above. + """ + + enable_local_disk_encryption: VariableOrOptional[bool] = None + """ + Whether to enable local disk encryption for the cluster. + """ + + gcp_attributes: VariableOrOptional[GcpAttributes] = None + """ + Attributes related to clusters running on Google Cloud Platform. + If not specified at cluster creation, a set of default values will be used. + """ + + init_scripts: VariableOrList[InitScriptInfo] = field(default_factory=list) + """ + The configuration for storing init scripts. Any number of destinations can be specified. The scripts are executed sequentially in the order provided. If `cluster_log_conf` is specified, init script logs are sent to `//init_scripts`. + """ + + instance_pool_id: VariableOrOptional[str] = None + """ + The optional ID of the instance pool to which the cluster belongs. + """ + + label: VariableOrOptional[str] = None + """ + A label for the cluster specification, either `default` to configure the default cluster, or `maintenance` to configure the maintenance cluster. This field is optional. The default value is `default`. + """ + + node_type_id: VariableOrOptional[str] = None + """ + This field encodes, through a single value, the resources available to each of + the Spark nodes in this cluster. For example, the Spark nodes can be provisioned + and optimized for memory or compute intensive workloads. A list of available node + types can be retrieved by using the :method:clusters/listNodeTypes API call. + + """ + + num_workers: VariableOrOptional[int] = None + """ + Number of worker nodes that this cluster should have. A cluster has one Spark Driver + and `num_workers` Executors for a total of `num_workers` + 1 Spark nodes. + + Note: When reading the properties of a cluster, this field reflects the desired number + of workers rather than the actual current number of workers. For instance, if a cluster + is resized from 5 to 10 workers, this field will immediately be updated to reflect + the target size of 10 workers, whereas the workers listed in `spark_info` will gradually + increase from 5 to 10 as the new nodes are provisioned. + """ + + policy_id: VariableOrOptional[str] = None + """ + The ID of the cluster policy used to create the cluster if applicable. + """ + + spark_conf: VariableOrDict[str] = field(default_factory=dict) + """ + An object containing a set of optional, user-specified Spark configuration key-value pairs. + See :method:clusters/create for more details. + + """ + + spark_env_vars: VariableOrDict[str] = field(default_factory=dict) + """ + An object containing a set of optional, user-specified environment variable key-value pairs. + Please note that key-value pair of the form (X,Y) will be exported as is (i.e., + `export X='Y'`) while launching the driver and workers. + + In order to specify an additional set of `SPARK_DAEMON_JAVA_OPTS`, we recommend appending + them to `$SPARK_DAEMON_JAVA_OPTS` as shown in the example below. This ensures that all + default databricks managed environmental variables are included as well. + + Example Spark environment variables: + `{"SPARK_WORKER_MEMORY": "28000m", "SPARK_LOCAL_DIRS": "/local_disk0"}` or + `{"SPARK_DAEMON_JAVA_OPTS": "$SPARK_DAEMON_JAVA_OPTS -Dspark.shuffle.service.enabled=true"}` + """ + + ssh_public_keys: VariableOrList[str] = field(default_factory=list) + """ + SSH public key contents that will be added to each Spark node in this cluster. The + corresponding private keys can be used to login with the user name `ubuntu` on port `2200`. + Up to 10 keys can be specified. + """ + + @classmethod + def from_dict(cls, value: "PipelineClusterDict") -> "Self": + return _transform(cls, value) + + def as_dict(self) -> "PipelineClusterDict": + return _transform_to_json_value(self) # type:ignore + + +class PipelineClusterDict(TypedDict, total=False): + """""" + + apply_policy_default_values: VariableOrOptional[bool] + """ + Note: This field won't be persisted. Only API users will check this field. + """ + + autoscale: VariableOrOptional[PipelineClusterAutoscaleParam] + """ + Parameters needed in order to automatically scale clusters up and down based on load. + Note: autoscaling works best with DB runtime versions 3.0 or later. + """ + + aws_attributes: VariableOrOptional[AwsAttributesParam] + """ + Attributes related to clusters running on Amazon Web Services. + If not specified at cluster creation, a set of default values will be used. + """ + + azure_attributes: VariableOrOptional[AzureAttributesParam] + """ + Attributes related to clusters running on Microsoft Azure. + If not specified at cluster creation, a set of default values will be used. + """ + + cluster_log_conf: VariableOrOptional[ClusterLogConfParam] + """ + The configuration for delivering spark logs to a long-term storage destination. + Only dbfs destinations are supported. Only one destination can be specified + for one cluster. If the conf is given, the logs will be delivered to the destination every + `5 mins`. The destination of driver logs is `$destination/$clusterId/driver`, while + the destination of executor logs is `$destination/$clusterId/executor`. + + """ + + custom_tags: VariableOrDict[str] + """ + Additional tags for cluster resources. Databricks will tag all cluster resources (e.g., AWS + instances and EBS volumes) with these tags in addition to `default_tags`. Notes: + + - Currently, Databricks allows at most 45 custom tags + + - Clusters can only reuse cloud resources if the resources' tags are a subset of the cluster tags + """ + + driver_instance_pool_id: VariableOrOptional[str] + """ + The optional ID of the instance pool for the driver of the cluster belongs. + The pool cluster uses the instance pool with id (instance_pool_id) if the driver pool is not + assigned. + """ + + driver_node_type_id: VariableOrOptional[str] + """ + The node type of the Spark driver. + Note that this field is optional; if unset, the driver node type will be set as the same value + as `node_type_id` defined above. + """ + + enable_local_disk_encryption: VariableOrOptional[bool] + """ + Whether to enable local disk encryption for the cluster. + """ + + gcp_attributes: VariableOrOptional[GcpAttributesParam] + """ + Attributes related to clusters running on Google Cloud Platform. + If not specified at cluster creation, a set of default values will be used. + """ + + init_scripts: VariableOrList[InitScriptInfoParam] + """ + The configuration for storing init scripts. Any number of destinations can be specified. The scripts are executed sequentially in the order provided. If `cluster_log_conf` is specified, init script logs are sent to `//init_scripts`. + """ + + instance_pool_id: VariableOrOptional[str] + """ + The optional ID of the instance pool to which the cluster belongs. + """ + + label: VariableOrOptional[str] + """ + A label for the cluster specification, either `default` to configure the default cluster, or `maintenance` to configure the maintenance cluster. This field is optional. The default value is `default`. + """ + + node_type_id: VariableOrOptional[str] + """ + This field encodes, through a single value, the resources available to each of + the Spark nodes in this cluster. For example, the Spark nodes can be provisioned + and optimized for memory or compute intensive workloads. A list of available node + types can be retrieved by using the :method:clusters/listNodeTypes API call. + + """ + + num_workers: VariableOrOptional[int] + """ + Number of worker nodes that this cluster should have. A cluster has one Spark Driver + and `num_workers` Executors for a total of `num_workers` + 1 Spark nodes. + + Note: When reading the properties of a cluster, this field reflects the desired number + of workers rather than the actual current number of workers. For instance, if a cluster + is resized from 5 to 10 workers, this field will immediately be updated to reflect + the target size of 10 workers, whereas the workers listed in `spark_info` will gradually + increase from 5 to 10 as the new nodes are provisioned. + """ + + policy_id: VariableOrOptional[str] + """ + The ID of the cluster policy used to create the cluster if applicable. + """ + + spark_conf: VariableOrDict[str] + """ + An object containing a set of optional, user-specified Spark configuration key-value pairs. + See :method:clusters/create for more details. + + """ + + spark_env_vars: VariableOrDict[str] + """ + An object containing a set of optional, user-specified environment variable key-value pairs. + Please note that key-value pair of the form (X,Y) will be exported as is (i.e., + `export X='Y'`) while launching the driver and workers. + + In order to specify an additional set of `SPARK_DAEMON_JAVA_OPTS`, we recommend appending + them to `$SPARK_DAEMON_JAVA_OPTS` as shown in the example below. This ensures that all + default databricks managed environmental variables are included as well. + + Example Spark environment variables: + `{"SPARK_WORKER_MEMORY": "28000m", "SPARK_LOCAL_DIRS": "/local_disk0"}` or + `{"SPARK_DAEMON_JAVA_OPTS": "$SPARK_DAEMON_JAVA_OPTS -Dspark.shuffle.service.enabled=true"}` + """ + + ssh_public_keys: VariableOrList[str] + """ + SSH public key contents that will be added to each Spark node in this cluster. The + corresponding private keys can be used to login with the user name `ubuntu` on port `2200`. + Up to 10 keys can be specified. + """ + + +PipelineClusterParam = PipelineClusterDict | PipelineCluster diff --git a/experimental/python/databricks/bundles/pipelines/_models/pipeline_cluster_autoscale.py b/experimental/python/databricks/bundles/pipelines/_models/pipeline_cluster_autoscale.py new file mode 100644 index 0000000000..973e66a67b --- /dev/null +++ b/experimental/python/databricks/bundles/pipelines/_models/pipeline_cluster_autoscale.py @@ -0,0 +1,74 @@ +from dataclasses import dataclass +from typing import TYPE_CHECKING, TypedDict + +from databricks.bundles.core._transform import _transform +from databricks.bundles.core._transform_to_json import _transform_to_json_value +from databricks.bundles.core._variable import VariableOr, VariableOrOptional +from databricks.bundles.pipelines._models.pipeline_cluster_autoscale_mode import ( + PipelineClusterAutoscaleMode, + PipelineClusterAutoscaleModeParam, +) + +if TYPE_CHECKING: + from typing_extensions import Self + + +@dataclass(kw_only=True) +class PipelineClusterAutoscale: + """""" + + max_workers: VariableOr[int] + """ + The maximum number of workers to which the cluster can scale up when overloaded. `max_workers` must be strictly greater than `min_workers`. + """ + + min_workers: VariableOr[int] + """ + The minimum number of workers the cluster can scale down to when underutilized. + It is also the initial number of workers the cluster will have after creation. + """ + + mode: VariableOrOptional[PipelineClusterAutoscaleMode] = None + """ + Databricks Enhanced Autoscaling optimizes cluster utilization by automatically + allocating cluster resources based on workload volume, with minimal impact to + the data processing latency of your pipelines. Enhanced Autoscaling is available + for `updates` clusters only. The legacy autoscaling feature is used for `maintenance` + clusters. + + """ + + @classmethod + def from_dict(cls, value: "PipelineClusterAutoscaleDict") -> "Self": + return _transform(cls, value) + + def as_dict(self) -> "PipelineClusterAutoscaleDict": + return _transform_to_json_value(self) # type:ignore + + +class PipelineClusterAutoscaleDict(TypedDict, total=False): + """""" + + max_workers: VariableOr[int] + """ + The maximum number of workers to which the cluster can scale up when overloaded. `max_workers` must be strictly greater than `min_workers`. + """ + + min_workers: VariableOr[int] + """ + The minimum number of workers the cluster can scale down to when underutilized. + It is also the initial number of workers the cluster will have after creation. + """ + + mode: VariableOrOptional[PipelineClusterAutoscaleModeParam] + """ + Databricks Enhanced Autoscaling optimizes cluster utilization by automatically + allocating cluster resources based on workload volume, with minimal impact to + the data processing latency of your pipelines. Enhanced Autoscaling is available + for `updates` clusters only. The legacy autoscaling feature is used for `maintenance` + clusters. + + """ + + +PipelineClusterAutoscaleParam = PipelineClusterAutoscaleDict | PipelineClusterAutoscale diff --git a/experimental/python/databricks/bundles/pipelines/_models/pipeline_cluster_autoscale_mode.py b/experimental/python/databricks/bundles/pipelines/_models/pipeline_cluster_autoscale_mode.py new file mode 100644 index 0000000000..16f46dbffd --- /dev/null +++ b/experimental/python/databricks/bundles/pipelines/_models/pipeline_cluster_autoscale_mode.py @@ -0,0 +1,21 @@ +from enum import Enum +from typing import Literal + + +class PipelineClusterAutoscaleMode(Enum): + """ + Databricks Enhanced Autoscaling optimizes cluster utilization by automatically + allocating cluster resources based on workload volume, with minimal impact to + the data processing latency of your pipelines. Enhanced Autoscaling is available + for `updates` clusters only. The legacy autoscaling feature is used for `maintenance` + clusters. + + """ + + ENHANCED = "ENHANCED" + LEGACY = "LEGACY" + + +PipelineClusterAutoscaleModeParam = ( + Literal["ENHANCED", "LEGACY"] | PipelineClusterAutoscaleMode +) diff --git a/experimental/python/databricks/bundles/pipelines/_models/pipeline_deployment.py b/experimental/python/databricks/bundles/pipelines/_models/pipeline_deployment.py new file mode 100644 index 0000000000..7494817560 --- /dev/null +++ b/experimental/python/databricks/bundles/pipelines/_models/pipeline_deployment.py @@ -0,0 +1,52 @@ +from dataclasses import dataclass +from typing import TYPE_CHECKING, TypedDict + +from databricks.bundles.core._transform import _transform +from databricks.bundles.core._transform_to_json import _transform_to_json_value +from databricks.bundles.core._variable import VariableOrOptional +from databricks.bundles.pipelines._models.deployment_kind import ( + DeploymentKind, + DeploymentKindParam, +) + +if TYPE_CHECKING: + from typing_extensions import Self + + +@dataclass(kw_only=True) +class PipelineDeployment: + """""" + + kind: VariableOrOptional[DeploymentKind] = None + """ + The deployment method that manages the pipeline. + """ + + metadata_file_path: VariableOrOptional[str] = None + """ + The path to the file containing metadata about the deployment. + """ + + @classmethod + def from_dict(cls, value: "PipelineDeploymentDict") -> "Self": + return _transform(cls, value) + + def as_dict(self) -> "PipelineDeploymentDict": + return _transform_to_json_value(self) # type:ignore + + +class PipelineDeploymentDict(TypedDict, total=False): + """""" + + kind: VariableOrOptional[DeploymentKindParam] + """ + The deployment method that manages the pipeline. + """ + + metadata_file_path: VariableOrOptional[str] + """ + The path to the file containing metadata about the deployment. + """ + + +PipelineDeploymentParam = PipelineDeploymentDict | PipelineDeployment diff --git a/experimental/python/databricks/bundles/pipelines/_models/pipeline_library.py b/experimental/python/databricks/bundles/pipelines/_models/pipeline_library.py new file mode 100644 index 0000000000..3e9b7094cb --- /dev/null +++ b/experimental/python/databricks/bundles/pipelines/_models/pipeline_library.py @@ -0,0 +1,96 @@ +from dataclasses import dataclass +from typing import TYPE_CHECKING, TypedDict + +from databricks.bundles.compute._models.maven_library import ( + MavenLibrary, + MavenLibraryParam, +) +from databricks.bundles.core._transform import _transform +from databricks.bundles.core._transform_to_json import _transform_to_json_value +from databricks.bundles.core._variable import VariableOrOptional +from databricks.bundles.pipelines._models.file_library import ( + FileLibrary, + FileLibraryParam, +) +from databricks.bundles.pipelines._models.notebook_library import ( + NotebookLibrary, + NotebookLibraryParam, +) + +if TYPE_CHECKING: + from typing_extensions import Self + + +@dataclass(kw_only=True) +class PipelineLibrary: + """""" + + file: VariableOrOptional[FileLibrary] = None + """ + The path to a file that defines a pipeline and is stored in the Databricks Repos. + + """ + + jar: VariableOrOptional[str] = None + """ + :meta private: [EXPERIMENTAL] + + URI of the jar to be installed. Currently only DBFS is supported. + + """ + + maven: VariableOrOptional[MavenLibrary] = None + """ + :meta private: [EXPERIMENTAL] + + Specification of a maven library to be installed. + + """ + + notebook: VariableOrOptional[NotebookLibrary] = None + """ + The path to a notebook that defines a pipeline and is stored in the Databricks workspace. + + """ + + @classmethod + def from_dict(cls, value: "PipelineLibraryDict") -> "Self": + return _transform(cls, value) + + def as_dict(self) -> "PipelineLibraryDict": + return _transform_to_json_value(self) # type:ignore + + +class PipelineLibraryDict(TypedDict, total=False): + """""" + + file: VariableOrOptional[FileLibraryParam] + """ + The path to a file that defines a pipeline and is stored in the Databricks Repos. + + """ + + jar: VariableOrOptional[str] + """ + :meta private: [EXPERIMENTAL] + + URI of the jar to be installed. Currently only DBFS is supported. + + """ + + maven: VariableOrOptional[MavenLibraryParam] + """ + :meta private: [EXPERIMENTAL] + + Specification of a maven library to be installed. + + """ + + notebook: VariableOrOptional[NotebookLibraryParam] + """ + The path to a notebook that defines a pipeline and is stored in the Databricks workspace. + + """ + + +PipelineLibraryParam = PipelineLibraryDict | PipelineLibrary diff --git a/experimental/python/databricks/bundles/pipelines/_models/pipeline_permission.py b/experimental/python/databricks/bundles/pipelines/_models/pipeline_permission.py new file mode 100644 index 0000000000..404b788a33 --- /dev/null +++ b/experimental/python/databricks/bundles/pipelines/_models/pipeline_permission.py @@ -0,0 +1,60 @@ +from dataclasses import dataclass +from typing import TYPE_CHECKING, TypedDict + +from databricks.bundles.core._transform import _transform +from databricks.bundles.core._transform_to_json import _transform_to_json_value +from databricks.bundles.core._variable import VariableOr, VariableOrOptional +from databricks.bundles.pipelines._models.pipeline_permission_level import ( + PipelinePermissionLevel, + PipelinePermissionLevelParam, +) + +if TYPE_CHECKING: + from typing_extensions import Self + + +@dataclass(kw_only=True) +class PipelinePermission: + """""" + + level: VariableOr[PipelinePermissionLevel] + + group_name: VariableOrOptional[str] = None + + service_principal_name: VariableOrOptional[str] = None + + user_name: VariableOrOptional[str] = None + + def __post_init__(self): + union_fields = [ + self.user_name, + self.service_principal_name, + self.group_name, + ] + + if sum(f is not None for f in union_fields) != 1: + raise ValueError( + "PipelinePermission must specify exactly one of 'user_name', 'service_principal_name', 'group_name'" + ) + + @classmethod + def from_dict(cls, value: "PipelinePermissionDict") -> "Self": + return _transform(cls, value) + + def as_dict(self) -> "PipelinePermissionDict": + return _transform_to_json_value(self) # type:ignore + + +class PipelinePermissionDict(TypedDict, total=False): + """""" + + level: VariableOr[PipelinePermissionLevelParam] + + group_name: VariableOrOptional[str] + + service_principal_name: VariableOrOptional[str] + + user_name: VariableOrOptional[str] + + +PipelinePermissionParam = PipelinePermissionDict | PipelinePermission diff --git a/experimental/python/databricks/bundles/pipelines/_models/pipeline_permission_level.py b/experimental/python/databricks/bundles/pipelines/_models/pipeline_permission_level.py new file mode 100644 index 0000000000..2c7845fef3 --- /dev/null +++ b/experimental/python/databricks/bundles/pipelines/_models/pipeline_permission_level.py @@ -0,0 +1,14 @@ +from enum import Enum +from typing import Literal + + +class PipelinePermissionLevel(Enum): + CAN_MANAGE = "CAN_MANAGE" + CAN_RUN = "CAN_RUN" + CAN_VIEW = "CAN_VIEW" + IS_OWNER = "IS_OWNER" + + +PipelinePermissionLevelParam = ( + Literal["CAN_MANAGE", "CAN_RUN", "CAN_VIEW", "IS_OWNER"] | PipelinePermissionLevel +) diff --git a/experimental/python/databricks/bundles/pipelines/_models/pipeline_trigger.py b/experimental/python/databricks/bundles/pipelines/_models/pipeline_trigger.py new file mode 100644 index 0000000000..7d89cbcca6 --- /dev/null +++ b/experimental/python/databricks/bundles/pipelines/_models/pipeline_trigger.py @@ -0,0 +1,44 @@ +from dataclasses import dataclass +from typing import TYPE_CHECKING, TypedDict + +from databricks.bundles.core._transform import _transform +from databricks.bundles.core._transform_to_json import _transform_to_json_value +from databricks.bundles.core._variable import VariableOrOptional +from databricks.bundles.pipelines._models.cron_trigger import ( + CronTrigger, + CronTriggerParam, +) +from databricks.bundles.pipelines._models.manual_trigger import ( + ManualTrigger, + ManualTriggerParam, +) + +if TYPE_CHECKING: + from typing_extensions import Self + + +@dataclass(kw_only=True) +class PipelineTrigger: + """""" + + cron: VariableOrOptional[CronTrigger] = None + + manual: VariableOrOptional[ManualTrigger] = None + + @classmethod + def from_dict(cls, value: "PipelineTriggerDict") -> "Self": + return _transform(cls, value) + + def as_dict(self) -> "PipelineTriggerDict": + return _transform_to_json_value(self) # type:ignore + + +class PipelineTriggerDict(TypedDict, total=False): + """""" + + cron: VariableOrOptional[CronTriggerParam] + + manual: VariableOrOptional[ManualTriggerParam] + + +PipelineTriggerParam = PipelineTriggerDict | PipelineTrigger diff --git a/experimental/python/databricks/bundles/pipelines/_models/report_spec.py b/experimental/python/databricks/bundles/pipelines/_models/report_spec.py new file mode 100644 index 0000000000..4e32d9f51d --- /dev/null +++ b/experimental/python/databricks/bundles/pipelines/_models/report_spec.py @@ -0,0 +1,82 @@ +from dataclasses import dataclass +from typing import TYPE_CHECKING, TypedDict + +from databricks.bundles.core._transform import _transform +from databricks.bundles.core._transform_to_json import _transform_to_json_value +from databricks.bundles.core._variable import VariableOrOptional +from databricks.bundles.pipelines._models.table_specific_config import ( + TableSpecificConfig, + TableSpecificConfigParam, +) + +if TYPE_CHECKING: + from typing_extensions import Self + + +@dataclass(kw_only=True) +class ReportSpec: + """""" + + destination_catalog: VariableOrOptional[str] = None + """ + Required. Destination catalog to store table. + """ + + destination_schema: VariableOrOptional[str] = None + """ + Required. Destination schema to store table. + """ + + destination_table: VariableOrOptional[str] = None + """ + Required. Destination table name. The pipeline fails if a table with that name already exists. + """ + + source_url: VariableOrOptional[str] = None + """ + Required. Report URL in the source system. + """ + + table_configuration: VariableOrOptional[TableSpecificConfig] = None + """ + Configuration settings to control the ingestion of tables. These settings override the table_configuration defined in the IngestionPipelineDefinition object. + """ + + @classmethod + def from_dict(cls, value: "ReportSpecDict") -> "Self": + return _transform(cls, value) + + def as_dict(self) -> "ReportSpecDict": + return _transform_to_json_value(self) # type:ignore + + +class ReportSpecDict(TypedDict, total=False): + """""" + + destination_catalog: VariableOrOptional[str] + """ + Required. Destination catalog to store table. + """ + + destination_schema: VariableOrOptional[str] + """ + Required. Destination schema to store table. + """ + + destination_table: VariableOrOptional[str] + """ + Required. Destination table name. The pipeline fails if a table with that name already exists. + """ + + source_url: VariableOrOptional[str] + """ + Required. Report URL in the source system. + """ + + table_configuration: VariableOrOptional[TableSpecificConfigParam] + """ + Configuration settings to control the ingestion of tables. These settings override the table_configuration defined in the IngestionPipelineDefinition object. + """ + + +ReportSpecParam = ReportSpecDict | ReportSpec diff --git a/experimental/python/databricks/bundles/pipelines/_models/restart_window.py b/experimental/python/databricks/bundles/pipelines/_models/restart_window.py new file mode 100644 index 0000000000..2385a32c7a --- /dev/null +++ b/experimental/python/databricks/bundles/pipelines/_models/restart_window.py @@ -0,0 +1,71 @@ +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, TypedDict + +from databricks.bundles.core._transform import _transform +from databricks.bundles.core._transform_to_json import _transform_to_json_value +from databricks.bundles.core._variable import ( + VariableOr, + VariableOrList, + VariableOrOptional, +) +from databricks.bundles.pipelines._models.day_of_week import DayOfWeek, DayOfWeekParam + +if TYPE_CHECKING: + from typing_extensions import Self + + +@dataclass(kw_only=True) +class RestartWindow: + """ + :meta private: [EXPERIMENTAL] + """ + + start_hour: VariableOr[int] + """ + An integer between 0 and 23 denoting the start hour for the restart window in the 24-hour day. + Continuous pipeline restart is triggered only within a five-hour window starting at this hour. + """ + + days_of_week: VariableOrList[DayOfWeek] = field(default_factory=list) + """ + Days of week in which the restart is allowed to happen (within a five-hour window starting at start_hour). + If not specified all days of the week will be used. + """ + + time_zone_id: VariableOrOptional[str] = None + """ + Time zone id of restart window. See https://docs.databricks.com/sql/language-manual/sql-ref-syntax-aux-conf-mgmt-set-timezone.html for details. + If not specified, UTC will be used. + """ + + @classmethod + def from_dict(cls, value: "RestartWindowDict") -> "Self": + return _transform(cls, value) + + def as_dict(self) -> "RestartWindowDict": + return _transform_to_json_value(self) # type:ignore + + +class RestartWindowDict(TypedDict, total=False): + """""" + + start_hour: VariableOr[int] + """ + An integer between 0 and 23 denoting the start hour for the restart window in the 24-hour day. + Continuous pipeline restart is triggered only within a five-hour window starting at this hour. + """ + + days_of_week: VariableOrList[DayOfWeekParam] + """ + Days of week in which the restart is allowed to happen (within a five-hour window starting at start_hour). + If not specified all days of the week will be used. + """ + + time_zone_id: VariableOrOptional[str] + """ + Time zone id of restart window. See https://docs.databricks.com/sql/language-manual/sql-ref-syntax-aux-conf-mgmt-set-timezone.html for details. + If not specified, UTC will be used. + """ + + +RestartWindowParam = RestartWindowDict | RestartWindow diff --git a/experimental/python/databricks/bundles/pipelines/_models/run_as.py b/experimental/python/databricks/bundles/pipelines/_models/run_as.py new file mode 100644 index 0000000000..dadceecac7 --- /dev/null +++ b/experimental/python/databricks/bundles/pipelines/_models/run_as.py @@ -0,0 +1,54 @@ +from dataclasses import dataclass +from typing import TYPE_CHECKING, TypedDict + +from databricks.bundles.core._transform import _transform +from databricks.bundles.core._transform_to_json import _transform_to_json_value +from databricks.bundles.core._variable import VariableOrOptional + +if TYPE_CHECKING: + from typing_extensions import Self + + +@dataclass(kw_only=True) +class RunAs: + """ + :meta private: [EXPERIMENTAL] + + Write-only setting, available only in Create/Update calls. Specifies the user or service principal that the pipeline runs as. If not specified, the pipeline runs as the user who created the pipeline. + + Only `user_name` or `service_principal_name` can be specified. If both are specified, an error is thrown. + """ + + service_principal_name: VariableOrOptional[str] = None + """ + Application ID of an active service principal. Setting this field requires the `servicePrincipal/user` role. + """ + + user_name: VariableOrOptional[str] = None + """ + The email of an active workspace user. Users can only set this field to their own email. + """ + + @classmethod + def from_dict(cls, value: "RunAsDict") -> "Self": + return _transform(cls, value) + + def as_dict(self) -> "RunAsDict": + return _transform_to_json_value(self) # type:ignore + + +class RunAsDict(TypedDict, total=False): + """""" + + service_principal_name: VariableOrOptional[str] + """ + Application ID of an active service principal. Setting this field requires the `servicePrincipal/user` role. + """ + + user_name: VariableOrOptional[str] + """ + The email of an active workspace user. Users can only set this field to their own email. + """ + + +RunAsParam = RunAsDict | RunAs diff --git a/experimental/python/databricks/bundles/pipelines/_models/schema_spec.py b/experimental/python/databricks/bundles/pipelines/_models/schema_spec.py new file mode 100644 index 0000000000..dd47de000a --- /dev/null +++ b/experimental/python/databricks/bundles/pipelines/_models/schema_spec.py @@ -0,0 +1,82 @@ +from dataclasses import dataclass +from typing import TYPE_CHECKING, TypedDict + +from databricks.bundles.core._transform import _transform +from databricks.bundles.core._transform_to_json import _transform_to_json_value +from databricks.bundles.core._variable import VariableOrOptional +from databricks.bundles.pipelines._models.table_specific_config import ( + TableSpecificConfig, + TableSpecificConfigParam, +) + +if TYPE_CHECKING: + from typing_extensions import Self + + +@dataclass(kw_only=True) +class SchemaSpec: + """""" + + destination_catalog: VariableOrOptional[str] = None + """ + Required. Destination catalog to store tables. + """ + + destination_schema: VariableOrOptional[str] = None + """ + Required. Destination schema to store tables in. Tables with the same name as the source tables are created in this destination schema. The pipeline fails If a table with the same name already exists. + """ + + source_catalog: VariableOrOptional[str] = None + """ + The source catalog name. Might be optional depending on the type of source. + """ + + source_schema: VariableOrOptional[str] = None + """ + Required. Schema name in the source database. + """ + + table_configuration: VariableOrOptional[TableSpecificConfig] = None + """ + Configuration settings to control the ingestion of tables. These settings are applied to all tables in this schema and override the table_configuration defined in the IngestionPipelineDefinition object. + """ + + @classmethod + def from_dict(cls, value: "SchemaSpecDict") -> "Self": + return _transform(cls, value) + + def as_dict(self) -> "SchemaSpecDict": + return _transform_to_json_value(self) # type:ignore + + +class SchemaSpecDict(TypedDict, total=False): + """""" + + destination_catalog: VariableOrOptional[str] + """ + Required. Destination catalog to store tables. + """ + + destination_schema: VariableOrOptional[str] + """ + Required. Destination schema to store tables in. Tables with the same name as the source tables are created in this destination schema. The pipeline fails If a table with the same name already exists. + """ + + source_catalog: VariableOrOptional[str] + """ + The source catalog name. Might be optional depending on the type of source. + """ + + source_schema: VariableOrOptional[str] + """ + Required. Schema name in the source database. + """ + + table_configuration: VariableOrOptional[TableSpecificConfigParam] + """ + Configuration settings to control the ingestion of tables. These settings are applied to all tables in this schema and override the table_configuration defined in the IngestionPipelineDefinition object. + """ + + +SchemaSpecParam = SchemaSpecDict | SchemaSpec diff --git a/experimental/python/databricks/bundles/pipelines/_models/table_spec.py b/experimental/python/databricks/bundles/pipelines/_models/table_spec.py new file mode 100644 index 0000000000..28e00c3c12 --- /dev/null +++ b/experimental/python/databricks/bundles/pipelines/_models/table_spec.py @@ -0,0 +1,102 @@ +from dataclasses import dataclass +from typing import TYPE_CHECKING, TypedDict + +from databricks.bundles.core._transform import _transform +from databricks.bundles.core._transform_to_json import _transform_to_json_value +from databricks.bundles.core._variable import VariableOrOptional +from databricks.bundles.pipelines._models.table_specific_config import ( + TableSpecificConfig, + TableSpecificConfigParam, +) + +if TYPE_CHECKING: + from typing_extensions import Self + + +@dataclass(kw_only=True) +class TableSpec: + """""" + + destination_catalog: VariableOrOptional[str] = None + """ + Required. Destination catalog to store table. + """ + + destination_schema: VariableOrOptional[str] = None + """ + Required. Destination schema to store table. + """ + + destination_table: VariableOrOptional[str] = None + """ + Optional. Destination table name. The pipeline fails if a table with that name already exists. If not set, the source table name is used. + """ + + source_catalog: VariableOrOptional[str] = None + """ + Source catalog name. Might be optional depending on the type of source. + """ + + source_schema: VariableOrOptional[str] = None + """ + Schema name in the source database. Might be optional depending on the type of source. + """ + + source_table: VariableOrOptional[str] = None + """ + Required. Table name in the source database. + """ + + table_configuration: VariableOrOptional[TableSpecificConfig] = None + """ + Configuration settings to control the ingestion of tables. These settings override the table_configuration defined in the IngestionPipelineDefinition object and the SchemaSpec. + """ + + @classmethod + def from_dict(cls, value: "TableSpecDict") -> "Self": + return _transform(cls, value) + + def as_dict(self) -> "TableSpecDict": + return _transform_to_json_value(self) # type:ignore + + +class TableSpecDict(TypedDict, total=False): + """""" + + destination_catalog: VariableOrOptional[str] + """ + Required. Destination catalog to store table. + """ + + destination_schema: VariableOrOptional[str] + """ + Required. Destination schema to store table. + """ + + destination_table: VariableOrOptional[str] + """ + Optional. Destination table name. The pipeline fails if a table with that name already exists. If not set, the source table name is used. + """ + + source_catalog: VariableOrOptional[str] + """ + Source catalog name. Might be optional depending on the type of source. + """ + + source_schema: VariableOrOptional[str] + """ + Schema name in the source database. Might be optional depending on the type of source. + """ + + source_table: VariableOrOptional[str] + """ + Required. Table name in the source database. + """ + + table_configuration: VariableOrOptional[TableSpecificConfigParam] + """ + Configuration settings to control the ingestion of tables. These settings override the table_configuration defined in the IngestionPipelineDefinition object and the SchemaSpec. + """ + + +TableSpecParam = TableSpecDict | TableSpec diff --git a/experimental/python/databricks/bundles/pipelines/_models/table_specific_config.py b/experimental/python/databricks/bundles/pipelines/_models/table_specific_config.py new file mode 100644 index 0000000000..204447de62 --- /dev/null +++ b/experimental/python/databricks/bundles/pipelines/_models/table_specific_config.py @@ -0,0 +1,80 @@ +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, TypedDict + +from databricks.bundles.core._transform import _transform +from databricks.bundles.core._transform_to_json import _transform_to_json_value +from databricks.bundles.core._variable import VariableOrList, VariableOrOptional +from databricks.bundles.pipelines._models.table_specific_config_scd_type import ( + TableSpecificConfigScdType, + TableSpecificConfigScdTypeParam, +) + +if TYPE_CHECKING: + from typing_extensions import Self + + +@dataclass(kw_only=True) +class TableSpecificConfig: + """""" + + primary_keys: VariableOrList[str] = field(default_factory=list) + """ + The primary key of the table used to apply changes. + """ + + salesforce_include_formula_fields: VariableOrOptional[bool] = None + """ + :meta private: [EXPERIMENTAL] + + If true, formula fields defined in the table are included in the ingestion. This setting is only valid for the Salesforce connector + """ + + scd_type: VariableOrOptional[TableSpecificConfigScdType] = None + """ + :meta private: [EXPERIMENTAL] + + The SCD type to use to ingest the table. + """ + + sequence_by: VariableOrList[str] = field(default_factory=list) + """ + The column names specifying the logical order of events in the source data. Delta Live Tables uses this sequencing to handle change events that arrive out of order. + """ + + @classmethod + def from_dict(cls, value: "TableSpecificConfigDict") -> "Self": + return _transform(cls, value) + + def as_dict(self) -> "TableSpecificConfigDict": + return _transform_to_json_value(self) # type:ignore + + +class TableSpecificConfigDict(TypedDict, total=False): + """""" + + primary_keys: VariableOrList[str] + """ + The primary key of the table used to apply changes. + """ + + salesforce_include_formula_fields: VariableOrOptional[bool] + """ + :meta private: [EXPERIMENTAL] + + If true, formula fields defined in the table are included in the ingestion. This setting is only valid for the Salesforce connector + """ + + scd_type: VariableOrOptional[TableSpecificConfigScdTypeParam] + """ + :meta private: [EXPERIMENTAL] + + The SCD type to use to ingest the table. + """ + + sequence_by: VariableOrList[str] + """ + The column names specifying the logical order of events in the source data. Delta Live Tables uses this sequencing to handle change events that arrive out of order. + """ + + +TableSpecificConfigParam = TableSpecificConfigDict | TableSpecificConfig diff --git a/experimental/python/databricks/bundles/pipelines/_models/table_specific_config_scd_type.py b/experimental/python/databricks/bundles/pipelines/_models/table_specific_config_scd_type.py new file mode 100644 index 0000000000..1ad679e4b0 --- /dev/null +++ b/experimental/python/databricks/bundles/pipelines/_models/table_specific_config_scd_type.py @@ -0,0 +1,18 @@ +from enum import Enum +from typing import Literal + + +class TableSpecificConfigScdType(Enum): + """ + :meta private: [EXPERIMENTAL] + + The SCD type to use to ingest the table. + """ + + SCD_TYPE_1 = "SCD_TYPE_1" + SCD_TYPE_2 = "SCD_TYPE_2" + + +TableSpecificConfigScdTypeParam = ( + Literal["SCD_TYPE_1", "SCD_TYPE_2"] | TableSpecificConfigScdType +) From 3148938cfbfe1470078aa5533bff1ddf17cbe607 Mon Sep 17 00:00:00 2001 From: Gleb Kanterov Date: Tue, 8 Apr 2025 16:16:49 +0200 Subject: [PATCH 2/4] Remove deprecated/unused --- .../bundles/pipelines/_models/cron_trigger.py | 36 ------------- .../pipelines/_models/deployment_kind.py | 15 ------ .../pipelines/_models/manual_trigger.py | 27 ---------- .../pipelines/_models/pipeline_deployment.py | 52 ------------------- .../pipelines/_models/pipeline_permission.py | 12 ----- .../pipelines/_models/pipeline_trigger.py | 44 ---------------- 6 files changed, 186 deletions(-) delete mode 100644 experimental/python/databricks/bundles/pipelines/_models/cron_trigger.py delete mode 100644 experimental/python/databricks/bundles/pipelines/_models/deployment_kind.py delete mode 100644 experimental/python/databricks/bundles/pipelines/_models/manual_trigger.py delete mode 100644 experimental/python/databricks/bundles/pipelines/_models/pipeline_deployment.py delete mode 100644 experimental/python/databricks/bundles/pipelines/_models/pipeline_trigger.py diff --git a/experimental/python/databricks/bundles/pipelines/_models/cron_trigger.py b/experimental/python/databricks/bundles/pipelines/_models/cron_trigger.py deleted file mode 100644 index ab67a42658..0000000000 --- a/experimental/python/databricks/bundles/pipelines/_models/cron_trigger.py +++ /dev/null @@ -1,36 +0,0 @@ -from dataclasses import dataclass -from typing import TYPE_CHECKING, TypedDict - -from databricks.bundles.core._transform import _transform -from databricks.bundles.core._transform_to_json import _transform_to_json_value -from databricks.bundles.core._variable import VariableOrOptional - -if TYPE_CHECKING: - from typing_extensions import Self - - -@dataclass(kw_only=True) -class CronTrigger: - """""" - - quartz_cron_schedule: VariableOrOptional[str] = None - - timezone_id: VariableOrOptional[str] = None - - @classmethod - def from_dict(cls, value: "CronTriggerDict") -> "Self": - return _transform(cls, value) - - def as_dict(self) -> "CronTriggerDict": - return _transform_to_json_value(self) # type:ignore - - -class CronTriggerDict(TypedDict, total=False): - """""" - - quartz_cron_schedule: VariableOrOptional[str] - - timezone_id: VariableOrOptional[str] - - -CronTriggerParam = CronTriggerDict | CronTrigger diff --git a/experimental/python/databricks/bundles/pipelines/_models/deployment_kind.py b/experimental/python/databricks/bundles/pipelines/_models/deployment_kind.py deleted file mode 100644 index 800382ef18..0000000000 --- a/experimental/python/databricks/bundles/pipelines/_models/deployment_kind.py +++ /dev/null @@ -1,15 +0,0 @@ -from enum import Enum -from typing import Literal - - -class DeploymentKind(Enum): - """ - The deployment method that manages the pipeline: - - BUNDLE: The pipeline is managed by a Databricks Asset Bundle. - - """ - - BUNDLE = "BUNDLE" - - -DeploymentKindParam = Literal["BUNDLE"] | DeploymentKind diff --git a/experimental/python/databricks/bundles/pipelines/_models/manual_trigger.py b/experimental/python/databricks/bundles/pipelines/_models/manual_trigger.py deleted file mode 100644 index c7403cc07d..0000000000 --- a/experimental/python/databricks/bundles/pipelines/_models/manual_trigger.py +++ /dev/null @@ -1,27 +0,0 @@ -from dataclasses import dataclass -from typing import TYPE_CHECKING, TypedDict - -from databricks.bundles.core._transform import _transform -from databricks.bundles.core._transform_to_json import _transform_to_json_value - -if TYPE_CHECKING: - from typing_extensions import Self - - -@dataclass(kw_only=True) -class ManualTrigger: - """""" - - @classmethod - def from_dict(cls, value: "ManualTriggerDict") -> "Self": - return _transform(cls, value) - - def as_dict(self) -> "ManualTriggerDict": - return _transform_to_json_value(self) # type:ignore - - -class ManualTriggerDict(TypedDict, total=False): - """""" - - -ManualTriggerParam = ManualTriggerDict | ManualTrigger diff --git a/experimental/python/databricks/bundles/pipelines/_models/pipeline_deployment.py b/experimental/python/databricks/bundles/pipelines/_models/pipeline_deployment.py deleted file mode 100644 index 7494817560..0000000000 --- a/experimental/python/databricks/bundles/pipelines/_models/pipeline_deployment.py +++ /dev/null @@ -1,52 +0,0 @@ -from dataclasses import dataclass -from typing import TYPE_CHECKING, TypedDict - -from databricks.bundles.core._transform import _transform -from databricks.bundles.core._transform_to_json import _transform_to_json_value -from databricks.bundles.core._variable import VariableOrOptional -from databricks.bundles.pipelines._models.deployment_kind import ( - DeploymentKind, - DeploymentKindParam, -) - -if TYPE_CHECKING: - from typing_extensions import Self - - -@dataclass(kw_only=True) -class PipelineDeployment: - """""" - - kind: VariableOrOptional[DeploymentKind] = None - """ - The deployment method that manages the pipeline. - """ - - metadata_file_path: VariableOrOptional[str] = None - """ - The path to the file containing metadata about the deployment. - """ - - @classmethod - def from_dict(cls, value: "PipelineDeploymentDict") -> "Self": - return _transform(cls, value) - - def as_dict(self) -> "PipelineDeploymentDict": - return _transform_to_json_value(self) # type:ignore - - -class PipelineDeploymentDict(TypedDict, total=False): - """""" - - kind: VariableOrOptional[DeploymentKindParam] - """ - The deployment method that manages the pipeline. - """ - - metadata_file_path: VariableOrOptional[str] - """ - The path to the file containing metadata about the deployment. - """ - - -PipelineDeploymentParam = PipelineDeploymentDict | PipelineDeployment diff --git a/experimental/python/databricks/bundles/pipelines/_models/pipeline_permission.py b/experimental/python/databricks/bundles/pipelines/_models/pipeline_permission.py index 404b788a33..0a3ed95538 100644 --- a/experimental/python/databricks/bundles/pipelines/_models/pipeline_permission.py +++ b/experimental/python/databricks/bundles/pipelines/_models/pipeline_permission.py @@ -25,18 +25,6 @@ class PipelinePermission: user_name: VariableOrOptional[str] = None - def __post_init__(self): - union_fields = [ - self.user_name, - self.service_principal_name, - self.group_name, - ] - - if sum(f is not None for f in union_fields) != 1: - raise ValueError( - "PipelinePermission must specify exactly one of 'user_name', 'service_principal_name', 'group_name'" - ) - @classmethod def from_dict(cls, value: "PipelinePermissionDict") -> "Self": return _transform(cls, value) diff --git a/experimental/python/databricks/bundles/pipelines/_models/pipeline_trigger.py b/experimental/python/databricks/bundles/pipelines/_models/pipeline_trigger.py deleted file mode 100644 index 7d89cbcca6..0000000000 --- a/experimental/python/databricks/bundles/pipelines/_models/pipeline_trigger.py +++ /dev/null @@ -1,44 +0,0 @@ -from dataclasses import dataclass -from typing import TYPE_CHECKING, TypedDict - -from databricks.bundles.core._transform import _transform -from databricks.bundles.core._transform_to_json import _transform_to_json_value -from databricks.bundles.core._variable import VariableOrOptional -from databricks.bundles.pipelines._models.cron_trigger import ( - CronTrigger, - CronTriggerParam, -) -from databricks.bundles.pipelines._models.manual_trigger import ( - ManualTrigger, - ManualTriggerParam, -) - -if TYPE_CHECKING: - from typing_extensions import Self - - -@dataclass(kw_only=True) -class PipelineTrigger: - """""" - - cron: VariableOrOptional[CronTrigger] = None - - manual: VariableOrOptional[ManualTrigger] = None - - @classmethod - def from_dict(cls, value: "PipelineTriggerDict") -> "Self": - return _transform(cls, value) - - def as_dict(self) -> "PipelineTriggerDict": - return _transform_to_json_value(self) # type:ignore - - -class PipelineTriggerDict(TypedDict, total=False): - """""" - - cron: VariableOrOptional[CronTriggerParam] - - manual: VariableOrOptional[ManualTriggerParam] - - -PipelineTriggerParam = PipelineTriggerDict | PipelineTrigger From eaebb55acc5f05ef5a4289c04a9319bf45d1c822 Mon Sep 17 00:00:00 2001 From: Gleb Kanterov Date: Tue, 8 Apr 2025 19:01:08 +0200 Subject: [PATCH 3/4] Fix codegen problems --- .../bundles/pipelines/_models/pipeline.py | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/experimental/python/databricks/bundles/pipelines/_models/pipeline.py b/experimental/python/databricks/bundles/pipelines/_models/pipeline.py index 96c5858403..99693c04df 100644 --- a/experimental/python/databricks/bundles/pipelines/_models/pipeline.py +++ b/experimental/python/databricks/bundles/pipelines/_models/pipeline.py @@ -58,7 +58,7 @@ class Pipeline(Resource): budget_policy_id: VariableOrOptional[str] = None """ :meta private: [EXPERIMENTAL] - + Budget policy of this pipeline. """ @@ -110,7 +110,7 @@ class Pipeline(Resource): gateway_definition: VariableOrOptional[IngestionGatewayPipelineDefinition] = None """ :meta private: [EXPERIMENTAL] - + The definition of a gateway pipeline to support change data capture. """ @@ -149,7 +149,7 @@ class Pipeline(Resource): restart_window: VariableOrOptional[RestartWindow] = None """ :meta private: [EXPERIMENTAL] - + Restart window of this pipeline. """ @@ -173,6 +173,11 @@ class Pipeline(Resource): DBFS root directory for storing checkpoints and tables. """ + target: VariableOrOptional[str] = None + """ + Target schema (database) to add tables in this pipeline to. Exactly one of `schema` or `target` must be specified. To publish to Unity Catalog, also specify `catalog`. This legacy field is deprecated for pipeline creation in favor of the `schema` field. + """ + @classmethod def from_dict(cls, value: "PipelineDict") -> "Self": return _transform(cls, value) @@ -184,19 +189,10 @@ def as_dict(self) -> "PipelineDict": class PipelineDict(TypedDict, total=False): """""" -<<<<<<< HEAD - name: VariableOrOptional[str] - """ - TODO - """ - - -PipelineParam = Pipeline | PipelineDict -======= budget_policy_id: VariableOrOptional[str] """ :meta private: [EXPERIMENTAL] - + Budget policy of this pipeline. """ @@ -248,7 +244,7 @@ class PipelineDict(TypedDict, total=False): gateway_definition: VariableOrOptional[IngestionGatewayPipelineDefinitionParam] """ :meta private: [EXPERIMENTAL] - + The definition of a gateway pipeline to support change data capture. """ @@ -287,7 +283,7 @@ class PipelineDict(TypedDict, total=False): restart_window: VariableOrOptional[RestartWindowParam] """ :meta private: [EXPERIMENTAL] - + Restart window of this pipeline. """ @@ -311,6 +307,10 @@ class PipelineDict(TypedDict, total=False): DBFS root directory for storing checkpoints and tables. """ + target: VariableOrOptional[str] + """ + Target schema (database) to add tables in this pipeline to. Exactly one of `schema` or `target` must be specified. To publish to Unity Catalog, also specify `catalog`. This legacy field is deprecated for pipeline creation in favor of the `schema` field. + """ + PipelineParam = PipelineDict | Pipeline ->>>>>>> 2aabe4c0 (Add pipelines) From 3b385af337b3d3b4e2f850dff993e81e6c89b497 Mon Sep 17 00:00:00 2001 From: Gleb Kanterov Date: Wed, 9 Apr 2025 14:55:59 +0200 Subject: [PATCH 4/4] Disable fail on warnings for docs There are warnings because the same classes are included into jobs and pipeline packages. We will fix it when we change code generation to vendor compute package into each resource using it. The output for docs already looks correct --- experimental/python/Makefile | 2 +- experimental/python/docs/ext/autodoc_databricks_bundles.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/experimental/python/Makefile b/experimental/python/Makefile index 0d69f7ddc4..1878aab767 100644 --- a/experimental/python/Makefile +++ b/experimental/python/Makefile @@ -6,7 +6,7 @@ fmt: docs: # Python 3.12+ is needed for get_overloads - uv run --python 3.12 sphinx-build docs docs/_output --fail-on-warning --show-traceback --nitpicky --fresh-env --keep-going + uv run --python 3.12 sphinx-build docs docs/_output --show-traceback --nitpicky --fresh-env --keep-going lint: # check if lock matches the project metadata diff --git a/experimental/python/docs/ext/autodoc_databricks_bundles.py b/experimental/python/docs/ext/autodoc_databricks_bundles.py index a2808094f7..8a6335a58d 100644 --- a/experimental/python/docs/ext/autodoc_databricks_bundles.py +++ b/experimental/python/docs/ext/autodoc_databricks_bundles.py @@ -297,7 +297,6 @@ def skip_member(app, what, name, obj, skip, options): # skip databricks.bundles.._module.FooDict classes # because we already document Foo dataclass that is equivalent. if what == "module" and name.endswith("Dict") and "._models." in obj.__module__: - print(what, name, obj, app) return True return skip