Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion acceptance/bundle/python/mutator-ordering/script
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
UV_ARGS="${UV_ARGS//\[\DATABRICKS_BUNDLES_WHEEL\]/$DATABRICKS_BUNDLES_WHEEL}"
echo "$DATABRICKS_BUNDLES_WHEEL" > "requirements-latest.txt"

# after mutators are applied, we expect to record location of the last mutator that had any effect

Expand Down
16 changes: 16 additions & 0 deletions acceptance/bundle/python/pipelines-support/databricks.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
bundle:
name: my_project

sync: { paths: [ ] } # don't need to copy files

experimental:
python:
resources:
- "resources:load_resources"
mutators:
- "mutators:update_pipeline"

resources:
pipelines:
my_pipeline_1:
name: "My Pipeline 1"
11 changes: 11 additions & 0 deletions acceptance/bundle/python/pipelines-support/mutators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from dataclasses import replace

from databricks.bundles.core import pipeline_mutator
from databricks.bundles.pipelines import Pipeline


@pipeline_mutator
def update_pipeline(pipeline: Pipeline) -> Pipeline:
assert isinstance(pipeline.name, str)

return replace(pipeline, name=f"{pipeline.name} (updated)")
34 changes: 34 additions & 0 deletions acceptance/bundle/python/pipelines-support/output.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@

>>> uv run [UV_ARGS] -q [CLI] bundle validate --output json
{
"experimental": {
"python": {
"mutators": [
"mutators:update_pipeline"
],
"resources": [
"resources:load_resources"
]
}
},
"resources": {
"pipelines": {
"my_pipeline_1": {
"deployment": {
"kind": "BUNDLE",
"metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/my_project/default/state/metadata.json"
},
"name": "My Pipeline 1 (updated)",
"permissions": []
},
"my_pipeline_2": {
"deployment": {
"kind": "BUNDLE",
"metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/my_project/default/state/metadata.json"
},
"name": "My Pipeline 2 (updated)",
"permissions": []
}
}
}
}
9 changes: 9 additions & 0 deletions acceptance/bundle/python/pipelines-support/resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from databricks.bundles.core import Resources


def load_resources() -> Resources:
resources = Resources()

resources.add_pipeline("my_pipeline_2", {"name": "My Pipeline 2"})

return resources
6 changes: 6 additions & 0 deletions acceptance/bundle/python/pipelines-support/script
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
echo "$DATABRICKS_BUNDLES_WHEEL" > "requirements-latest.txt"

trace uv run $UV_ARGS -q $CLI bundle validate --output json | \
jq "pick(.experimental.python, .resources)"

rm -fr .databricks __pycache__
8 changes: 8 additions & 0 deletions acceptance/bundle/python/pipelines-support/test.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Local = true
Cloud = false # tests don't interact with APIs

[EnvMatrix]
UV_ARGS = [
# pipelines are only supported in the latest version of the wheel
"--with-requirements requirements-latest.txt --no-cache",
]
2 changes: 1 addition & 1 deletion acceptance/bundle/python/resolve-variable/script
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
UV_ARGS="${UV_ARGS//\[\DATABRICKS_BUNDLES_WHEEL\]/$DATABRICKS_BUNDLES_WHEEL}"
echo "$DATABRICKS_BUNDLES_WHEEL" > "requirements-latest.txt"

trace uv run $UV_ARGS -q $CLI bundle validate --output json | \
jq "pick(.experimental.python, .resources)"
Expand Down
2 changes: 1 addition & 1 deletion acceptance/bundle/python/resource-loading/script
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
UV_ARGS="${UV_ARGS//\[\DATABRICKS_BUNDLES_WHEEL\]/$DATABRICKS_BUNDLES_WHEEL}"
echo "$DATABRICKS_BUNDLES_WHEEL" > "requirements-latest.txt"

# each job should record location where add_job function was called

Expand Down
2 changes: 1 addition & 1 deletion acceptance/bundle/python/restricted-execution/script
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
UV_ARGS="${UV_ARGS//\[\DATABRICKS_BUNDLES_WHEEL\]/$DATABRICKS_BUNDLES_WHEEL}"
echo "$DATABRICKS_BUNDLES_WHEEL" > "requirements-latest.txt"

export SOME_ENV_VAR="value_from_env"

Expand Down
8 changes: 6 additions & 2 deletions acceptance/bundle/python/test.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
# Tests don't interact with APIs

Ignore = [
# created by scripts to install the latest wheel with UV_ARGS
"requirements-latest.txt",
]

[EnvMatrix]
UV_ARGS = [
"--with databricks-bundles==0.7.3",
# NB: test runner doesn't support substitutions, they are expanded in the script
"--with [DATABRICKS_BUNDLES_WHEEL] --no-cache",
"--with-requirements requirements-latest.txt --no-cache",
]
91 changes: 61 additions & 30 deletions experimental/python/databricks/bundles/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@
import sys
from copy import deepcopy
from dataclasses import dataclass, field, fields, replace
from typing import Callable, Optional, TextIO
from typing import (
Callable,
Optional,
TextIO,
)

from databricks.bundles.core import Bundle, Diagnostics, Location, Resources
from databricks.bundles.core._resource_mutator import ResourceMutator
from databricks.bundles.core._resource_type import _ResourceType
from databricks.bundles.core._transform import _transform
from databricks.bundles.jobs import Job

__all__ = []

Expand Down Expand Up @@ -81,21 +85,23 @@ def _load_resources_from_input(input: dict) -> tuple[Resources, Diagnostics]:
diagnostics = Diagnostics()

input_resources = input.get("resources", {})
input_jobs = input_resources.get("jobs", {})

for resource_name, job_dict in input_jobs.items():
try:
job = Job.from_dict(job_dict)
for tpe in _ResourceType.all():
input_resources_by_tpe = input_resources.get(tpe.plural_name, {})

resources.add_job(resource_name, job)
except Exception as exc:
diagnostics = diagnostics.extend(
Diagnostics.from_exception(
exc=exc,
summary="Error while loading job",
path=("resources", "jobs", resource_name),
for resource_name, resource_dict in input_resources_by_tpe.items():
try:
resource = _transform(tpe.resource_type, resource_dict)

resources.add_resource(resource_name=resource_name, resource=resource)
except Exception as exc:
diagnostics = diagnostics.extend(
Diagnostics.from_exception(
exc=exc,
summary=f"Error while loading {tpe.singular_name}",
path=("resources", tpe.plural_name, resource_name),
)
)
)

return resources, diagnostics

Expand All @@ -105,38 +111,56 @@ def _apply_mutators(
resources: Resources,
mutator_functions: list[ResourceMutator],
) -> tuple[Resources, Diagnostics]:
for resource_name, job in resources.jobs.items():
diagnostics = Diagnostics()

for tpe in _ResourceType.all():
resources, diagnostics = diagnostics.extend_tuple(
_apply_mutators_for_type(bundle, resources, tpe, mutator_functions)
)

return resources, diagnostics


def _apply_mutators_for_type(
bundle: Bundle,
resources: Resources,
tpe: _ResourceType,
mutator_functions: list[ResourceMutator],
) -> tuple[Resources, Diagnostics]:
resources_dict = getattr(resources, tpe.plural_name)

for resource_name, resource in resources_dict.items():
for mutator in mutator_functions:
if mutator.resource_type != Job:
if mutator.resource_type != tpe.resource_type:
continue

location = Location.from_callable(mutator.function)

try:
if _get_num_args(mutator.function) == 1:
new_job = mutator.function(job)
new_resource = mutator.function(resource)
else:
# defensive copy so that one function doesn't affect another
new_job = mutator.function(deepcopy(bundle), job)
new_resource = mutator.function(deepcopy(bundle), resource)

# mutating job in-place works, but we can't tell when it happens,
# mutating resource in-place works, but we can't tell when it happens,
# so we only update location if new instance is returned

if new_job is not job:
if new_resource is not resource:
if location:
resources.add_location(
("resources", "jobs", resource_name), location
("resources", tpe.plural_name, resource_name), location
)
resources.jobs[resource_name] = new_job
job = new_job
resources_dict[resource_name] = new_resource
resource = new_resource
except Exception as exc:
mutator_name = mutator.function.__name__

return resources, Diagnostics.from_exception(
exc=exc,
summary=f"Failed to apply '{mutator_name}' mutator",
location=location,
path=("resources", "jobs", resource_name),
path=("resources", tpe.plural_name, resource_name),
)

return resources, Diagnostics()
Expand Down Expand Up @@ -219,12 +243,19 @@ def _append_resources(bundle: dict, resources: Resources) -> dict:

new_bundle = bundle.copy()

if resources.jobs:
new_bundle["resources"] = new_bundle.get("resources", {})
new_bundle["resources"]["jobs"] = new_bundle["resources"].get("jobs", {})
for tpe in _ResourceType.all():
resources_dict = getattr(resources, tpe.plural_name)

for resource_name, resource in resources.jobs.items():
new_bundle["resources"]["jobs"][resource_name] = resource.as_dict()
if resources_dict:
new_bundle["resources"] = new_bundle.get("resources", {})
new_bundle["resources"][tpe.plural_name] = new_bundle["resources"].get(
tpe.plural_name, {}
)

for resource_name, resource in resources_dict.items():
new_bundle["resources"][tpe.plural_name][resource_name] = (
resource.as_dict()
)

return new_bundle

Expand Down Expand Up @@ -385,7 +416,7 @@ def _load_resource_mutator(

if instance and not isinstance(instance, ResourceMutator):
return None, Diagnostics.create_error(
f"'{name}' in module '{module_name}' is not instance of ResourceMutator, did you decorate it with @job_mutator?",
f"'{name}' in module '{module_name}' is not instance of ResourceMutator, did you decorate it with @<resource_type>_mutator?",
)

return instance, diagnostics
Expand Down
7 changes: 6 additions & 1 deletion experimental/python/databricks/bundles/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"VariableOrList",
"VariableOrOptional",
"job_mutator",
"pipeline_mutator",
"load_resources_from_current_package_module",
"load_resources_from_module",
"load_resources_from_modules",
Expand All @@ -34,7 +35,11 @@
)
from databricks.bundles.core._location import Location
from databricks.bundles.core._resource import Resource
from databricks.bundles.core._resource_mutator import ResourceMutator, job_mutator
from databricks.bundles.core._resource_mutator import (
ResourceMutator,
job_mutator,
pipeline_mutator,
)
from databricks.bundles.core._resources import Resources
from databricks.bundles.core._variable import (
Variable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

if TYPE_CHECKING:
from databricks.bundles.jobs._models.job import Job
from databricks.bundles.pipelines._models.pipeline import Pipeline

_T = TypeVar("_T", bound=Resource)

Expand Down Expand Up @@ -94,3 +95,35 @@ def my_job_mutator(bundle: Bundle, job: Job) -> Job:
from databricks.bundles.jobs._models.job import Job

return ResourceMutator(resource_type=Job, function=function)


@overload
def pipeline_mutator(
function: Callable[[Bundle, "Pipeline"], "Pipeline"],
) -> ResourceMutator["Pipeline"]: ...


@overload
def pipeline_mutator(
function: Callable[["Pipeline"], "Pipeline"],
) -> ResourceMutator["Pipeline"]: ...


def pipeline_mutator(function: Callable) -> ResourceMutator["Pipeline"]:
"""
Decorator for defining a pipeline mutator. Function should return a new instance of the pipeline with the desired changes,
instead of mutating the input pipeline.

Example:

.. code-block:: python

@pipeline_mutator
def my_pipeline_mutator(bundle: Bundle, pipeline: Pipeline) -> Pipeline:
return replace(pipeline, name="my_job")

:param function: Function that mutates a pipeline.
"""
from databricks.bundles.pipelines._models.pipeline import Pipeline

return ResourceMutator(resource_type=Pipeline, function=function)
48 changes: 48 additions & 0 deletions experimental/python/databricks/bundles/core/_resource_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from dataclasses import dataclass
from typing import Type

from databricks.bundles.core._resource import Resource


@dataclass(kw_only=True, frozen=True)
class _ResourceType:
"""
NB: this class should stay internal-only and NOT be exported from databricks.bundles.core
"""

resource_type: Type[Resource]

singular_name: str
"""
Singular name, should be used in methods (e.g. "add_job"), error messages and as parameter names.
"""

plural_name: str
"""
Plural name, the same as in "resources" bundle section.
"""

@classmethod
def all(cls) -> tuple["_ResourceType", ...]:
"""
Returns all supported resource types.
"""

# intentionally lazily load all resource types to avoid imports from databricks.bundles.core to
# be imported in databricks.bundles.<resource_type>

from databricks.bundles.jobs._models.job import Job
from databricks.bundles.pipelines._models.pipeline import Pipeline

return (
_ResourceType(
resource_type=Job,
singular_name="job",
plural_name="jobs",
),
_ResourceType(
resource_type=Pipeline,
plural_name="pipelines",
singular_name="pipeline",
),
)
Loading
Loading