From 56ac00ad99351855ea39248975155d92e62b0619 Mon Sep 17 00:00:00 2001 From: Gleb Kanterov Date: Wed, 26 Mar 2025 14:30:52 +0100 Subject: [PATCH 1/3] [Python] Support pipelines --- .../python/pipelines-support/databricks.yml | 16 ++ .../python/pipelines-support/mutators.py | 11 ++ .../python/pipelines-support/output.txt | 34 ++++ .../python/pipelines-support/resources.py | 9 + .../bundle/python/pipelines-support/script | 6 + .../bundle/python/pipelines-support/test.toml | 8 + .../python/databricks/bundles/build.py | 91 ++++++--- .../databricks/bundles/core/__init__.py | 7 +- .../bundles/core/_resource_mutator.py | 33 ++++ .../databricks/bundles/core/_resource_type.py | 48 +++++ .../databricks/bundles/core/_resources.py | 44 +++++ .../databricks/bundles/pipelines/__init__.py | 6 + .../bundles/pipelines/_models/__init__.py | 0 .../bundles/pipelines/_models/pipeline.py | 40 ++++ .../databricks_tests/core/test_resources.py | 181 +++++++++++++++++- .../python/databricks_tests/test_build.py | 73 ++++++- .../docs/databricks.bundles.pipelines.rst | 11 ++ .../docs/ext/autodoc_databricks_bundles.py | 23 ++- experimental/python/docs/index.rst | 1 + 19 files changed, 590 insertions(+), 52 deletions(-) create mode 100644 acceptance/bundle/python/pipelines-support/databricks.yml create mode 100644 acceptance/bundle/python/pipelines-support/mutators.py create mode 100644 acceptance/bundle/python/pipelines-support/output.txt create mode 100644 acceptance/bundle/python/pipelines-support/resources.py create mode 100644 acceptance/bundle/python/pipelines-support/script create mode 100644 acceptance/bundle/python/pipelines-support/test.toml create mode 100644 experimental/python/databricks/bundles/core/_resource_type.py create mode 100644 experimental/python/databricks/bundles/pipelines/__init__.py create mode 100644 experimental/python/databricks/bundles/pipelines/_models/__init__.py create mode 100644 experimental/python/databricks/bundles/pipelines/_models/pipeline.py create mode 100644 experimental/python/docs/databricks.bundles.pipelines.rst diff --git a/acceptance/bundle/python/pipelines-support/databricks.yml b/acceptance/bundle/python/pipelines-support/databricks.yml new file mode 100644 index 0000000000..b30197752d --- /dev/null +++ b/acceptance/bundle/python/pipelines-support/databricks.yml @@ -0,0 +1,16 @@ +bundle: + name: my_project + +sync: { paths: [ ] } # don't need to copy files + +experimental: + python: + resources: + - "resources:load_resources" + mutators: + - "mutators:update_pipeline" + +resources: + pipelines: + my_pipeline_2: + name: "My Pipeline 2" diff --git a/acceptance/bundle/python/pipelines-support/mutators.py b/acceptance/bundle/python/pipelines-support/mutators.py new file mode 100644 index 0000000000..d1b62aa7b4 --- /dev/null +++ b/acceptance/bundle/python/pipelines-support/mutators.py @@ -0,0 +1,11 @@ +from dataclasses import replace + +from databricks.bundles.core import pipeline_mutator +from databricks.bundles.pipelines import Pipeline + + +@pipeline_mutator +def update_pipeline(pipeline: Pipeline) -> Pipeline: + assert isinstance(pipeline.name, str) + + return replace(pipeline, name=f"{pipeline.name} (updated)") diff --git a/acceptance/bundle/python/pipelines-support/output.txt b/acceptance/bundle/python/pipelines-support/output.txt new file mode 100644 index 0000000000..97d4ebe2e7 --- /dev/null +++ b/acceptance/bundle/python/pipelines-support/output.txt @@ -0,0 +1,34 @@ + +>>> uv run [UV_ARGS] -q [CLI] bundle validate --output json +{ + "experimental": { + "python": { + "mutators": [ + "mutators:update_pipeline" + ], + "resources": [ + "resources:load_resources" + ] + } + }, + "resources": { + "pipelines": { + "my_pipeline_1": { + "deployment": { + "kind": "BUNDLE", + "metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/my_project/default/state/metadata.json" + }, + "name": "My Pipeline 1 (updated)", + "permissions": [] + }, + "my_pipeline_2": { + "deployment": { + "kind": "BUNDLE", + "metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/my_project/default/state/metadata.json" + }, + "name": "My Pipeline 2 (updated)", + "permissions": [] + } + } + } +} diff --git a/acceptance/bundle/python/pipelines-support/resources.py b/acceptance/bundle/python/pipelines-support/resources.py new file mode 100644 index 0000000000..92108db8fa --- /dev/null +++ b/acceptance/bundle/python/pipelines-support/resources.py @@ -0,0 +1,9 @@ +from databricks.bundles.core import Resources + + +def load_resources() -> Resources: + resources = Resources() + + resources.add_pipeline("my_pipeline_1", {"name": "My Pipeline 1"}) + + return resources diff --git a/acceptance/bundle/python/pipelines-support/script b/acceptance/bundle/python/pipelines-support/script new file mode 100644 index 0000000000..1849f8e77a --- /dev/null +++ b/acceptance/bundle/python/pipelines-support/script @@ -0,0 +1,6 @@ +UV_ARGS="${UV_ARGS//\[\DATABRICKS_BUNDLES_WHEEL\]/$DATABRICKS_BUNDLES_WHEEL}" + +trace uv run $UV_ARGS -q $CLI bundle validate --output json | \ + jq "pick(.experimental.python, .resources)" + +rm -fr .databricks __pycache__ diff --git a/acceptance/bundle/python/pipelines-support/test.toml b/acceptance/bundle/python/pipelines-support/test.toml new file mode 100644 index 0000000000..d73d9b2ee7 --- /dev/null +++ b/acceptance/bundle/python/pipelines-support/test.toml @@ -0,0 +1,8 @@ +Local = true +Cloud = false # tests don't interact with APIs + +[EnvMatrix] +UV_ARGS = [ + # only works with the latest version of the wheel + "--with [DATABRICKS_BUNDLES_WHEEL] --no-cache", +] diff --git a/experimental/python/databricks/bundles/build.py b/experimental/python/databricks/bundles/build.py index 6a867ef620..4d7a119a98 100644 --- a/experimental/python/databricks/bundles/build.py +++ b/experimental/python/databricks/bundles/build.py @@ -7,12 +7,16 @@ import sys from copy import deepcopy from dataclasses import dataclass, field, fields, replace -from typing import Callable, Optional, TextIO +from typing import ( + Callable, + Optional, + TextIO, +) from databricks.bundles.core import Bundle, Diagnostics, Location, Resources from databricks.bundles.core._resource_mutator import ResourceMutator +from databricks.bundles.core._resource_type import _ResourceType from databricks.bundles.core._transform import _transform -from databricks.bundles.jobs import Job __all__ = [] @@ -81,21 +85,23 @@ def _load_resources_from_input(input: dict) -> tuple[Resources, Diagnostics]: diagnostics = Diagnostics() input_resources = input.get("resources", {}) - input_jobs = input_resources.get("jobs", {}) - for resource_name, job_dict in input_jobs.items(): - try: - job = Job.from_dict(job_dict) + for tpe in _ResourceType.all(): + input_resources_by_tpe = input_resources.get(tpe.plural_name, {}) - resources.add_job(resource_name, job) - except Exception as exc: - diagnostics = diagnostics.extend( - Diagnostics.from_exception( - exc=exc, - summary="Error while loading job", - path=("resources", "jobs", resource_name), + for resource_name, resource_dict in input_resources_by_tpe.items(): + try: + resource = _transform(tpe.resource_type, resource_dict) + + resources.add_resource(resource_name=resource_name, resource=resource) + except Exception as exc: + diagnostics = diagnostics.extend( + Diagnostics.from_exception( + exc=exc, + summary=f"Error while loading {tpe.singular_name}", + path=("resources", tpe.plural_name, resource_name), + ) ) - ) return resources, diagnostics @@ -105,30 +111,48 @@ def _apply_mutators( resources: Resources, mutator_functions: list[ResourceMutator], ) -> tuple[Resources, Diagnostics]: - for resource_name, job in resources.jobs.items(): + diagnostics = Diagnostics() + + for tpe in _ResourceType.all(): + resources, diagnostics = diagnostics.extend_tuple( + _apply_mutators_for_type(bundle, resources, tpe, mutator_functions) + ) + + return resources, diagnostics + + +def _apply_mutators_for_type( + bundle: Bundle, + resources: Resources, + tpe: _ResourceType, + mutator_functions: list[ResourceMutator], +) -> tuple[Resources, Diagnostics]: + resources_dict = getattr(resources, tpe.plural_name) + + for resource_name, resource in resources_dict.items(): for mutator in mutator_functions: - if mutator.resource_type != Job: + if mutator.resource_type != tpe.resource_type: continue location = Location.from_callable(mutator.function) try: if _get_num_args(mutator.function) == 1: - new_job = mutator.function(job) + new_resource = mutator.function(resource) else: # defensive copy so that one function doesn't affect another - new_job = mutator.function(deepcopy(bundle), job) + new_resource = mutator.function(deepcopy(bundle), resource) - # mutating job in-place works, but we can't tell when it happens, + # mutating resource in-place works, but we can't tell when it happens, # so we only update location if new instance is returned - if new_job is not job: + if new_resource is not resource: if location: resources.add_location( - ("resources", "jobs", resource_name), location + ("resources", tpe.plural_name, resource_name), location ) - resources.jobs[resource_name] = new_job - job = new_job + resources_dict[resource_name] = new_resource + resource = new_resource except Exception as exc: mutator_name = mutator.function.__name__ @@ -136,7 +160,7 @@ def _apply_mutators( exc=exc, summary=f"Failed to apply '{mutator_name}' mutator", location=location, - path=("resources", "jobs", resource_name), + path=("resources", tpe.plural_name, resource_name), ) return resources, Diagnostics() @@ -219,12 +243,19 @@ def _append_resources(bundle: dict, resources: Resources) -> dict: new_bundle = bundle.copy() - if resources.jobs: - new_bundle["resources"] = new_bundle.get("resources", {}) - new_bundle["resources"]["jobs"] = new_bundle["resources"].get("jobs", {}) + for tpe in _ResourceType.all(): + resources_dict = getattr(resources, tpe.plural_name) - for resource_name, resource in resources.jobs.items(): - new_bundle["resources"]["jobs"][resource_name] = resource.as_dict() + if resources_dict: + new_bundle["resources"] = new_bundle.get("resources", {}) + new_bundle["resources"][tpe.plural_name] = new_bundle["resources"].get( + tpe.plural_name, {} + ) + + for resource_name, resource in resources_dict.items(): + new_bundle["resources"][tpe.plural_name][resource_name] = ( + resource.as_dict() + ) return new_bundle @@ -385,7 +416,7 @@ def _load_resource_mutator( if instance and not isinstance(instance, ResourceMutator): return None, Diagnostics.create_error( - f"'{name}' in module '{module_name}' is not instance of ResourceMutator, did you decorate it with @job_mutator?", + f"'{name}' in module '{module_name}' is not instance of ResourceMutator, did you decorate it with @_mutator?", ) return instance, diagnostics diff --git a/experimental/python/databricks/bundles/core/__init__.py b/experimental/python/databricks/bundles/core/__init__.py index 1a56262721..3f7bb77038 100644 --- a/experimental/python/databricks/bundles/core/__init__.py +++ b/experimental/python/databricks/bundles/core/__init__.py @@ -13,6 +13,7 @@ "VariableOrList", "VariableOrOptional", "job_mutator", + "pipeline_mutator", "load_resources_from_current_package_module", "load_resources_from_module", "load_resources_from_modules", @@ -34,7 +35,11 @@ ) from databricks.bundles.core._location import Location from databricks.bundles.core._resource import Resource -from databricks.bundles.core._resource_mutator import ResourceMutator, job_mutator +from databricks.bundles.core._resource_mutator import ( + ResourceMutator, + job_mutator, + pipeline_mutator, +) from databricks.bundles.core._resources import Resources from databricks.bundles.core._variable import ( Variable, diff --git a/experimental/python/databricks/bundles/core/_resource_mutator.py b/experimental/python/databricks/bundles/core/_resource_mutator.py index b871c0f3a5..5f6a0a2fc4 100644 --- a/experimental/python/databricks/bundles/core/_resource_mutator.py +++ b/experimental/python/databricks/bundles/core/_resource_mutator.py @@ -7,6 +7,7 @@ if TYPE_CHECKING: from databricks.bundles.jobs._models.job import Job + from databricks.bundles.pipelines._models.pipeline import Pipeline _T = TypeVar("_T", bound=Resource) @@ -94,3 +95,35 @@ def my_job_mutator(bundle: Bundle, job: Job) -> Job: from databricks.bundles.jobs._models.job import Job return ResourceMutator(resource_type=Job, function=function) + + +@overload +def pipeline_mutator( + function: Callable[[Bundle, "Pipeline"], "Pipeline"], +) -> ResourceMutator["Pipeline"]: ... + + +@overload +def pipeline_mutator( + function: Callable[["Pipeline"], "Pipeline"], +) -> ResourceMutator["Pipeline"]: ... + + +def pipeline_mutator(function: Callable) -> ResourceMutator["Pipeline"]: + """ + Decorator for defining a pipeline mutator. Function should return a new instance of the pipeline with the desired changes, + instead of mutating the input pipeline. + + Example: + + .. code-block:: python + + @pipeline_mutator + def my_pipeline_mutator(bundle: Bundle, pipeline: Pipeline) -> Pipeline: + return replace(pipeline, name="my_job") + + :param function: Function that mutates a pipeline. + """ + from databricks.bundles.pipelines._models.pipeline import Pipeline + + return ResourceMutator(resource_type=Pipeline, function=function) diff --git a/experimental/python/databricks/bundles/core/_resource_type.py b/experimental/python/databricks/bundles/core/_resource_type.py new file mode 100644 index 0000000000..427ce552c6 --- /dev/null +++ b/experimental/python/databricks/bundles/core/_resource_type.py @@ -0,0 +1,48 @@ +from dataclasses import dataclass +from typing import Type + +from databricks.bundles.core._resource import Resource + + +@dataclass(kw_only=True, frozen=True) +class _ResourceType: + """ + NB: this class should stay internal-only and NOT be exported from databricks.bundles.core + """ + + resource_type: Type[Resource] + + singular_name: str + """ + Singular name, should be used in methods (e.g. "add_job"), error messages and as parameter names. + """ + + plural_name: str + """ + Plural name, the same as in "resources" bundle section. + """ + + @classmethod + def all(cls) -> tuple["_ResourceType", ...]: + """ + Returns all supported resource types. + """ + + # intentionally lazily load all resource types to avoid imports from databricks.bundles.core to + # be imported in databricks.bundles. + + from databricks.bundles.jobs._models.job import Job + from databricks.bundles.pipelines._models.pipeline import Pipeline + + return ( + _ResourceType( + resource_type=Job, + singular_name="job", + plural_name="jobs", + ), + _ResourceType( + resource_type=Pipeline, + plural_name="pipelines", + singular_name="pipeline", + ), + ) diff --git a/experimental/python/databricks/bundles/core/_resources.py b/experimental/python/databricks/bundles/core/_resources.py index 8dbc9cf402..fd776b8852 100644 --- a/experimental/python/databricks/bundles/core/_resources.py +++ b/experimental/python/databricks/bundles/core/_resources.py @@ -7,6 +7,7 @@ if TYPE_CHECKING: from databricks.bundles.jobs._models.job import Job, JobParam + from databricks.bundles.pipelines._models.pipeline import Pipeline, PipelineParam __all__ = ["Resources"] @@ -55,6 +56,7 @@ def load_resources(bundle: Bundle) -> Resources: def __init__(self): self._jobs = dict[str, "Job"]() + self._pipelines = dict[str, "Pipeline"]() self._locations = dict[tuple[str, ...], Location]() self._diagnostics = Diagnostics() @@ -62,6 +64,10 @@ def __init__(self): def jobs(self) -> dict[str, "Job"]: return self._jobs + @property + def pipelines(self) -> dict[str, "Pipeline"]: + return self._pipelines + @property def diagnostics(self) -> Diagnostics: """ @@ -86,12 +92,15 @@ def add_resource( """ from databricks.bundles.jobs import Job + from databricks.bundles.pipelines import Pipeline location = location or Location.from_stack_frame(depth=1) match resource: case Job(): self.add_job(resource_name, resource, location=location) + case Pipeline(): + self.add_pipeline(resource_name, resource, location=location) case _: raise ValueError(f"Unsupported resource type: {type(resource)}") @@ -127,6 +136,38 @@ def add_job( self._jobs[resource_name] = job + def add_pipeline( + self, + resource_name: str, + pipeline: "PipelineParam", + *, + location: Optional[Location] = None, + ) -> None: + """ + Adds a pipeline to the collection of resources. Resource name must be unique across all pipelines. + + :param resource_name: unique identifier for the pipeline + :param pipeline: the pipeline to add, can be Pipeline or dict + :param location: optional location of the pipeline in the source code + """ + from databricks.bundles.pipelines import Pipeline + + pipeline = _transform(Pipeline, pipeline) + path = ("resources", "pipelines", resource_name) + location = location or Location.from_stack_frame(depth=1) + + if self._pipelines.get(resource_name): + self.add_diagnostic_error( + msg=f"Duplicate resource name '{resource_name}' for a pipeline. Resource names must be unique.", + location=location, + path=path, + ) + else: + if location: + self.add_location(path, location) + + self._pipelines[resource_name] = pipeline + def add_location(self, path: tuple[str, ...], location: Location) -> None: """ Associate source code location with a path in the bundle configuration. @@ -200,6 +241,9 @@ def add_resources(self, other: "Resources") -> None: for name, job in other.jobs.items(): self.add_job(name, job) + for name, pipeline in other.pipelines.items(): + self.add_pipeline(name, pipeline) + for path, location in other._locations.items(): self.add_location(path, location) diff --git a/experimental/python/databricks/bundles/pipelines/__init__.py b/experimental/python/databricks/bundles/pipelines/__init__.py new file mode 100644 index 0000000000..a5da315c35 --- /dev/null +++ b/experimental/python/databricks/bundles/pipelines/__init__.py @@ -0,0 +1,6 @@ +from databricks.bundles.pipelines._models.pipeline import Pipeline, PipelineParam + +__all__ = [ + "Pipeline", + "PipelineParam", +] diff --git a/experimental/python/databricks/bundles/pipelines/_models/__init__.py b/experimental/python/databricks/bundles/pipelines/_models/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/experimental/python/databricks/bundles/pipelines/_models/pipeline.py b/experimental/python/databricks/bundles/pipelines/_models/pipeline.py new file mode 100644 index 0000000000..2b5dbbe076 --- /dev/null +++ b/experimental/python/databricks/bundles/pipelines/_models/pipeline.py @@ -0,0 +1,40 @@ +from dataclasses import dataclass +from typing import TYPE_CHECKING, TypedDict + +from databricks.bundles.core import Resource, VariableOrOptional +from databricks.bundles.core._transform import _transform +from databricks.bundles.core._transform_to_json import _transform_to_json_value + +if TYPE_CHECKING: + from typing_extensions import Self + +# TODO generate this class and its dependencies + + +@dataclass(kw_only=True) +class Pipeline(Resource): + """""" + + name: VariableOrOptional[str] + """ + TODO + """ + + @classmethod + def from_dict(cls, value: "PipelineDict") -> "Self": + return _transform(cls, value) + + def as_dict(self) -> "PipelineDict": + return _transform_to_json_value(self) # type:ignore + + +class PipelineDict(TypedDict, total=False): + """""" + + name: VariableOrOptional[str] + """ + TODO + """ + + +PipelineParam = Pipeline | PipelineDict diff --git a/experimental/python/databricks_tests/core/test_resources.py b/experimental/python/databricks_tests/core/test_resources.py index 539eedb928..f300672f78 100644 --- a/experimental/python/databricks_tests/core/test_resources.py +++ b/experimental/python/databricks_tests/core/test_resources.py @@ -1,5 +1,65 @@ +from dataclasses import dataclass, replace +from typing import Callable + +import pytest + from databricks.bundles.core import Location, Resources, Severity +from databricks.bundles.core._resource import Resource +from databricks.bundles.core._resource_mutator import ( + ResourceMutator, + job_mutator, + pipeline_mutator, +) +from databricks.bundles.core._resource_type import _ResourceType from databricks.bundles.jobs._models.job import Job +from databricks.bundles.pipelines._models.pipeline import Pipeline + + +@dataclass(kw_only=True) +class TestCase: + add_resource: Callable + dict_example: dict + dataclass_example: Resource + mutator: Callable + + +resource_types = {tpe.resource_type: tpe for tpe in _ResourceType.all()} +test_cases = [ + ( + TestCase( + add_resource=Resources.add_job, + dict_example={"name": "My job"}, + dataclass_example=Job(name="My job"), + mutator=job_mutator, + ), + resource_types[Job], + ), + ( + TestCase( + add_resource=Resources.add_pipeline, + dict_example={"name": "My pipeline"}, + dataclass_example=Pipeline(name="My pipeline"), + mutator=pipeline_mutator, + ), + resource_types[Pipeline], + ), +] +test_case_ids = [tpe.plural_name for _, tpe in test_cases] + + +def test_has_all_test_cases(): + for tpe in _ResourceType.all(): + found = False + + for _, test_case_tpe in test_cases: + if test_case_tpe == tpe: + found = True + break + + assert found, f"Missing test case for '{tpe.plural_name}'" + + +# Job-specific tests are left to self-test and give more readable examples def test_add_job(): @@ -10,6 +70,22 @@ def test_add_job(): assert resources.jobs == {"my_job": Job(name="My job")} +@pytest.mark.parametrize("tc,tpe", test_cases, ids=test_case_ids) +def test_add_resource_type(tc: TestCase, tpe: _ResourceType): + resources = Resources() + + tc.add_resource( + resources, + **{ + "resource_name": "my_resource", + tpe.singular_name: tc.dict_example, + }, + ) + + resource_dict = getattr(resources, tpe.plural_name) + assert resource_dict == {"my_resource": tc.dataclass_example} + + def test_add_job_dict(): resources = Resources() @@ -18,6 +94,22 @@ def test_add_job_dict(): assert resources.jobs == {"my_job": Job(name="My job")} +@pytest.mark.parametrize("tc,tpe", test_cases, ids=test_case_ids) +def test_add_resource_type_dict(tc: TestCase, tpe: _ResourceType): + resources = Resources() + + tc.add_resource( + resources, + **{ + "resource_name": "my_resource", + tpe.singular_name: tc.dict_example, + }, + ) + + resource_dict = getattr(resources, tpe.plural_name) + assert resource_dict == {"my_resource": tc.dataclass_example} + + def test_add_job_location(): resources = Resources() location = Location(file="my_file", line=1, column=2) @@ -27,6 +119,25 @@ def test_add_job_location(): assert resources._locations == {("resources", "jobs", "my_job"): location} +@pytest.mark.parametrize("tc,tpe", test_cases, ids=test_case_ids) +def test_add_resource_type_location(tc: TestCase, tpe: _ResourceType): + resources = Resources() + location = Location(file="my_file", line=1, column=2) + + tc.add_resource( + resources, + **{ + "resource_name": "my_resource", + tpe.singular_name: tc.dict_example, + "location": location, + }, + ) + + assert resources._locations == { + ("resources", tpe.plural_name, "my_resource"): location + } + + def test_add_job_location_automatic(): resources = Resources() @@ -40,22 +151,74 @@ def test_add_job_location_automatic(): assert location.column and location.column > 0 -def test_add_resource_job(): +@pytest.mark.parametrize("tc,tpe", test_cases, ids=test_case_ids) +def test_add_resource_type_location_automatic(tc: TestCase, tpe: _ResourceType): resources = Resources() - resources.add_resource("my_job", Job(name="My job")) + tc.add_resource( + resources, + **{ + "resource_name": "my_resource", + tpe.singular_name: tc.dict_example, + }, + ) - assert resources.jobs == {"my_job": Job(name="My job")} + assert resources._locations.keys() == { + ("resources", tpe.plural_name, "my_resource") + } + [location] = resources._locations.values() + assert location.file == __file__ + assert location.line and location.line > 0 + assert location.column and location.column > 0 -def test_add_duplicate_job(): + +@pytest.mark.parametrize("tc,tpe", test_cases, ids=test_case_ids) +def test_add_resource(tc: TestCase, tpe: _ResourceType): resources = Resources() - resources.add_job("my_job", {"name": "My job"}) - resources.add_job("my_job", {"name": "My job (2)"}) + resources.add_resource("my_resource", tc.dataclass_example) - # it's important not to override jobs, because, for instance, they can come from YAML - assert resources.jobs == {"my_job": Job(name="My job")} + resources_dict = getattr(resources, tpe.plural_name) + assert resources_dict == {"my_resource": tc.dataclass_example} + + +@pytest.mark.parametrize("tc,tpe", test_cases, ids=test_case_ids) +def test_add_resources(tc: TestCase, tpe: _ResourceType): + resources_1 = Resources() + resources_2 = Resources() + + resources_2.add_resource("my_resource", tc.dataclass_example) + resources_1.add_resources(resources_2) + + resources_dict = getattr(resources_1, tpe.plural_name) + assert resources_dict == {"my_resource": tc.dataclass_example} + + +@pytest.mark.parametrize("tc,tpe", test_cases, ids=test_case_ids) +def test_mutator(tc: TestCase, tpe: _ResourceType): + @tc.mutator + def my_func(bundle, resource): + return resource + + assert isinstance(my_func, ResourceMutator) + assert my_func.resource_type == tpe.resource_type + + +@pytest.mark.parametrize("tc,tpe", test_cases, ids=test_case_ids) +def test_add_duplicate_resource(tc: TestCase, tpe: _ResourceType): + resources = Resources() + + copy_1 = replace(tc.dataclass_example) + copy_2 = replace(tc.dataclass_example) + + resources.add_resource("my_resource", copy_1) + resources.add_resource("my_resource", copy_2) + + # it's important not to override resources, because, for instance, they can come from YAML + resources_dict = getattr(resources, tpe.plural_name) + assert resources_dict["my_resource"] is copy_1 + assert resources_dict["my_resource"] is not copy_2 assert len(resources.diagnostics.items) == 1 [item] = resources.diagnostics.items @@ -63,7 +226,7 @@ def test_add_duplicate_job(): assert item.severity == Severity.ERROR assert ( item.summary - == "Duplicate resource name 'my_job' for a job. Resource names must be unique." + == f"Duplicate resource name 'my_resource' for a {tpe.singular_name}. Resource names must be unique." ) diff --git a/experimental/python/databricks_tests/test_build.py b/experimental/python/databricks_tests/test_build.py index 2d08e138a4..5419d70d18 100644 --- a/experimental/python/databricks_tests/test_build.py +++ b/experimental/python/databricks_tests/test_build.py @@ -10,6 +10,7 @@ _Conf, _load_object, _load_resources, + _load_resources_from_input, _parse_args, _parse_bundle_info, _relativize_location, @@ -25,7 +26,9 @@ Severity, job_mutator, ) +from databricks.bundles.core._resource_mutator import pipeline_mutator from databricks.bundles.jobs import Job +from databricks.bundles.pipelines._models.pipeline import Pipeline def test_write_diagnostics(): @@ -200,13 +203,17 @@ def test_append_resources(): "jobs": { "job_0": {"name": "job_0"}, "job_1": {"name": "job_1"}, - } + }, + "pipelines": { + "pipeline_0": {"name": "pipeline_0"}, + }, }, } resources = Resources() resources.add_job("job_1", Job(name="new name", description="new description")) resources.add_job("job_2", Job(name="job_2")) + resources.add_pipeline("pipeline_1", Pipeline(name="pipeline_1")) out = _append_resources(input, resources) @@ -217,11 +224,44 @@ def test_append_resources(): "job_0": {"name": "job_0"}, "job_1": {"name": "new name", "description": "new description"}, "job_2": {"name": "job_2"}, - } + }, + "pipelines": { + "pipeline_0": {"name": "pipeline_0"}, + "pipeline_1": {"name": "pipeline_1"}, + }, }, } +def test_load_resources_from_input(): + resources, diagnostics = _load_resources_from_input( + { + "resources": { + "jobs": { + "job_0": {"name": "Job 0"}, + "job_1": {"name": "Job 1"}, + }, + "pipelines": { + "pipeline_0": {"name": "Pipeline 0"}, + "pipeline_1": {"name": "Pipeline 1"}, + }, + }, + }, + ) + + assert diagnostics == Diagnostics() + + assert resources.jobs == { + "job_0": Job(name="Job 0"), + "job_1": Job(name="Job 1"), + } + + assert resources.pipelines == { + "pipeline_0": Pipeline(name="Pipeline 0"), + "pipeline_1": Pipeline(name="Pipeline 1"), + } + + def test_parse_args(): args = _parse_args( [ @@ -302,7 +342,7 @@ def test_conf_from_dict(): ) -def test_mutators(): +def test_job_mutators(): bundle = Bundle(target="default") resources = Resources() resources.add_job("job_0", Job(tags={"tag": "value"})) @@ -337,6 +377,33 @@ def add_second_tag(bundle: Bundle, job: Job) -> Job: } +def test_pipeline_mutators(): + bundle = Bundle(target="default") + resources = Resources() + resources.add_pipeline("pipeline_0", {"name": "My Pipeline"}) + + @pipeline_mutator + def update_pipeline_name(bundle: Bundle, pipeline: Pipeline) -> Pipeline: + name = bundle.resolve_variable(pipeline.name) + + return replace(pipeline, name=f"{name} (updated)") + + new_resources, diagnostics = _apply_mutators( + bundle=bundle, + resources=resources, + mutator_functions=[update_pipeline_name], + ) + + expected_location = Location.from_callable(update_pipeline_name.function) + + assert not diagnostics.has_error() + assert ( + new_resources._locations[("resources", "pipelines", "pipeline_0")] + == expected_location + ) + assert new_resources.pipelines["pipeline_0"].name == "My Pipeline (updated)" + + def test_mutators_unmodified(): bundle = Bundle(target="default") diff --git a/experimental/python/docs/databricks.bundles.pipelines.rst b/experimental/python/docs/databricks.bundles.pipelines.rst new file mode 100644 index 0000000000..77a2017832 --- /dev/null +++ b/experimental/python/docs/databricks.bundles.pipelines.rst @@ -0,0 +1,11 @@ +Pipelines +=============================== + +.. currentmodule:: databricks.bundles.pipelines + +**Package:** ``databricks.bundles.pipelines`` + +Classes +--------------- + +.. automodule:: databricks.bundles.pipelines diff --git a/experimental/python/docs/ext/autodoc_databricks_bundles.py b/experimental/python/docs/ext/autodoc_databricks_bundles.py index c1b8814279..a2808094f7 100644 --- a/experimental/python/docs/ext/autodoc_databricks_bundles.py +++ b/experimental/python/docs/ext/autodoc_databricks_bundles.py @@ -13,6 +13,7 @@ from sphinx.util.inspect import stringify_signature from sphinx.util.typing import ExtensionMetadata +from databricks.bundles.core._resource_type import _ResourceType from databricks.bundles.core._transform import _unwrap_variable, _unwrap_optional @@ -160,11 +161,9 @@ def process_signature(app, what, name, obj, options, signature, return_annotatio if name.startswith("databricks.bundles.core."): # return signature, return_annotation sig = simplify_sig(sig, unwrap_variable=False) - elif name.startswith("databricks.bundles.jobs.") and name.endswith(".as_dict"): + elif name.startswith("databricks.bundles.") and name.endswith(".as_dict"): sig = simplify_as_dict_sig(sig) - elif name.startswith("databricks.bundles.jobs.") and name.endswith( - ".from_dict" - ): + elif name.startswith("databricks.bundles.") and name.endswith(".from_dict"): sig = simplify_from_dict_sig(sig) # this is the only recursive type we have, resolution is elif name == "databricks.bundles.jobs.ForEachTask.create": @@ -260,10 +259,14 @@ def simplify_sig(sig, unwrap_variable: bool) -> inspect.Signature: "databricks.bundles.core._variable._T": "databricks.bundles.core.T", "databricks.bundles.core._diagnostics._T": "databricks.bundles.core.T", "databricks.bundles.core._resource_mutator._T": "databricks.bundles.core.T", - # use dataclasses instead of typed dicts used in databricks.bundles.core - "JobParam": "databricks.bundles.jobs.Job", } +# use dataclasses instead of typed dicts used in databricks.bundles.core +for tpe in _ResourceType.all(): + rewrite_aliases[tpe.resource_type.__name__ + "Param"] = ( + tpe.resource_type.__module__ + "." + tpe.resource_type.__name__ + ) + def resolve_internal_aliases(app, doctree): """ @@ -308,9 +311,11 @@ def setup(app: Sphinx) -> ExtensionMetadata: disable_sphinx_overloads() # instead, select the first overload manually - databricks.bundles.core.job_mutator = typing.get_overloads( - databricks.bundles.core.job_mutator - )[0] + for tpe in _ResourceType.all(): + mutator_fn = getattr(databricks.bundles.core, tpe.singular_name + "_mutator") + overloads = typing.get_overloads(mutator_fn) + + setattr(databricks.bundles.core, tpe.singular_name + "_mutator", overloads[0]) app.setup_extension("sphinx.ext.autodoc") diff --git a/experimental/python/docs/index.rst b/experimental/python/docs/index.rst index a8208edf51..42ce0b62bc 100644 --- a/experimental/python/docs/index.rst +++ b/experimental/python/docs/index.rst @@ -11,3 +11,4 @@ See `What is Python support for Databricks Asset Bundles? (TBD) <#>`_. databricks.bundles.core databricks.bundles.jobs + databricks.bundles.pipelines From dfc8de5f0e8bbc289f19707580479d9f516fe23c Mon Sep 17 00:00:00 2001 From: Gleb Kanterov Date: Tue, 8 Apr 2025 09:43:00 +0200 Subject: [PATCH 2/3] Address CR comments --- acceptance/bundle/python/mutator-ordering/script | 2 +- .../bundle/python/pipelines-support/databricks.yml | 4 ++-- .../bundle/python/pipelines-support/resources.py | 2 +- acceptance/bundle/python/pipelines-support/script | 2 +- .../bundle/python/pipelines-support/test.toml | 4 ++-- acceptance/bundle/python/resolve-variable/script | 2 +- acceptance/bundle/python/resource-loading/script | 2 +- .../bundle/python/restricted-execution/script | 2 +- acceptance/bundle/python/test.toml | 10 +++++++--- .../bundles/pipelines/_models/pipeline.py | 14 +++++++------- 10 files changed, 24 insertions(+), 20 deletions(-) diff --git a/acceptance/bundle/python/mutator-ordering/script b/acceptance/bundle/python/mutator-ordering/script index 1683990c57..9d590188d7 100644 --- a/acceptance/bundle/python/mutator-ordering/script +++ b/acceptance/bundle/python/mutator-ordering/script @@ -1,4 +1,4 @@ -UV_ARGS="${UV_ARGS//\[\DATABRICKS_BUNDLES_WHEEL\]/$DATABRICKS_BUNDLES_WHEEL}" +echo "$DATABRICKS_BUNDLES_WHEEL" > "requirements-latest.txt" # after mutators are applied, we expect to record location of the last mutator that had any effect diff --git a/acceptance/bundle/python/pipelines-support/databricks.yml b/acceptance/bundle/python/pipelines-support/databricks.yml index b30197752d..ad22be89fa 100644 --- a/acceptance/bundle/python/pipelines-support/databricks.yml +++ b/acceptance/bundle/python/pipelines-support/databricks.yml @@ -12,5 +12,5 @@ experimental: resources: pipelines: - my_pipeline_2: - name: "My Pipeline 2" + my_pipeline_1: + name: "My Pipeline 1" diff --git a/acceptance/bundle/python/pipelines-support/resources.py b/acceptance/bundle/python/pipelines-support/resources.py index 92108db8fa..fc028edd9a 100644 --- a/acceptance/bundle/python/pipelines-support/resources.py +++ b/acceptance/bundle/python/pipelines-support/resources.py @@ -4,6 +4,6 @@ def load_resources() -> Resources: resources = Resources() - resources.add_pipeline("my_pipeline_1", {"name": "My Pipeline 1"}) + resources.add_pipeline("my_pipeline_2", {"name": "My Pipeline 2"}) return resources diff --git a/acceptance/bundle/python/pipelines-support/script b/acceptance/bundle/python/pipelines-support/script index 1849f8e77a..c94fd42a0c 100644 --- a/acceptance/bundle/python/pipelines-support/script +++ b/acceptance/bundle/python/pipelines-support/script @@ -1,4 +1,4 @@ -UV_ARGS="${UV_ARGS//\[\DATABRICKS_BUNDLES_WHEEL\]/$DATABRICKS_BUNDLES_WHEEL}" +echo "$DATABRICKS_BUNDLES_WHEEL" > "requirements-latest.txt" trace uv run $UV_ARGS -q $CLI bundle validate --output json | \ jq "pick(.experimental.python, .resources)" diff --git a/acceptance/bundle/python/pipelines-support/test.toml b/acceptance/bundle/python/pipelines-support/test.toml index d73d9b2ee7..b43fe72b07 100644 --- a/acceptance/bundle/python/pipelines-support/test.toml +++ b/acceptance/bundle/python/pipelines-support/test.toml @@ -3,6 +3,6 @@ Cloud = false # tests don't interact with APIs [EnvMatrix] UV_ARGS = [ - # only works with the latest version of the wheel - "--with [DATABRICKS_BUNDLES_WHEEL] --no-cache", + # pipelines are only supported in the latest version of the wheel + "--with-requirements requirements-latest.txt --no-cache", ] diff --git a/acceptance/bundle/python/resolve-variable/script b/acceptance/bundle/python/resolve-variable/script index 1849f8e77a..c94fd42a0c 100644 --- a/acceptance/bundle/python/resolve-variable/script +++ b/acceptance/bundle/python/resolve-variable/script @@ -1,4 +1,4 @@ -UV_ARGS="${UV_ARGS//\[\DATABRICKS_BUNDLES_WHEEL\]/$DATABRICKS_BUNDLES_WHEEL}" +echo "$DATABRICKS_BUNDLES_WHEEL" > "requirements-latest.txt" trace uv run $UV_ARGS -q $CLI bundle validate --output json | \ jq "pick(.experimental.python, .resources)" diff --git a/acceptance/bundle/python/resource-loading/script b/acceptance/bundle/python/resource-loading/script index 9ba6008724..1ac6863729 100644 --- a/acceptance/bundle/python/resource-loading/script +++ b/acceptance/bundle/python/resource-loading/script @@ -1,4 +1,4 @@ -UV_ARGS="${UV_ARGS//\[\DATABRICKS_BUNDLES_WHEEL\]/$DATABRICKS_BUNDLES_WHEEL}" +echo "$DATABRICKS_BUNDLES_WHEEL" > "requirements-latest.txt" # each job should record location where add_job function was called diff --git a/acceptance/bundle/python/restricted-execution/script b/acceptance/bundle/python/restricted-execution/script index b470f97f09..07bb754467 100644 --- a/acceptance/bundle/python/restricted-execution/script +++ b/acceptance/bundle/python/restricted-execution/script @@ -1,4 +1,4 @@ -UV_ARGS="${UV_ARGS//\[\DATABRICKS_BUNDLES_WHEEL\]/$DATABRICKS_BUNDLES_WHEEL}" +echo "$DATABRICKS_BUNDLES_WHEEL" > "requirements-latest.txt" export SOME_ENV_VAR="value_from_env" diff --git a/acceptance/bundle/python/test.toml b/acceptance/bundle/python/test.toml index 5c96741295..2812bef787 100644 --- a/acceptance/bundle/python/test.toml +++ b/acceptance/bundle/python/test.toml @@ -1,8 +1,12 @@ # Tests don't interact with APIs +Ignore = [ + # created by scripts to install the latest wheel with UV_ARGS + "requirements-latest.txt", +] + [EnvMatrix] UV_ARGS = [ "--with databricks-bundles==0.7.3", - # NB: test runner doesn't support substitutions, they are expanded in the script - "--with [DATABRICKS_BUNDLES_WHEEL] --no-cache", -] + "--with-requirements requirements-latest.txt --no-cache", +] \ No newline at end of file diff --git a/experimental/python/databricks/bundles/pipelines/_models/pipeline.py b/experimental/python/databricks/bundles/pipelines/_models/pipeline.py index 2b5dbbe076..e3c944e3fa 100644 --- a/experimental/python/databricks/bundles/pipelines/_models/pipeline.py +++ b/experimental/python/databricks/bundles/pipelines/_models/pipeline.py @@ -1,14 +1,14 @@ -from dataclasses import dataclass -from typing import TYPE_CHECKING, TypedDict +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Any, TypedDict -from databricks.bundles.core import Resource, VariableOrOptional +from databricks.bundles.core import Resource, VariableOrList, VariableOrOptional from databricks.bundles.core._transform import _transform from databricks.bundles.core._transform_to_json import _transform_to_json_value if TYPE_CHECKING: from typing_extensions import Self -# TODO generate this class and its dependencies +# TODO generate Pipeline class from jsonschema @dataclass(kw_only=True) @@ -16,9 +16,9 @@ class Pipeline(Resource): """""" name: VariableOrOptional[str] - """ - TODO - """ + + # permission field is always present after normalization, add stub not to error on unknown property + permissions: VariableOrList[Any] = field(default_factory=list) @classmethod def from_dict(cls, value: "PipelineDict") -> "Self": From f2461c2badbca98747a4008affa2cba56e2dac52 Mon Sep 17 00:00:00 2001 From: Gleb Kanterov Date: Tue, 8 Apr 2025 12:55:49 +0200 Subject: [PATCH 3/3] Fix eol --- acceptance/bundle/python/test.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/acceptance/bundle/python/test.toml b/acceptance/bundle/python/test.toml index 2812bef787..08b731953d 100644 --- a/acceptance/bundle/python/test.toml +++ b/acceptance/bundle/python/test.toml @@ -9,4 +9,4 @@ Ignore = [ UV_ARGS = [ "--with databricks-bundles==0.7.3", "--with-requirements requirements-latest.txt --no-cache", -] \ No newline at end of file +]