Skip to content

Commit 82f8d25

Browse files
committed
[Python] Support pipelines
1 parent 5b242f3 commit 82f8d25

File tree

19 files changed

+522
-53
lines changed

19 files changed

+522
-53
lines changed
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
bundle:
2+
name: my_project
3+
4+
sync: { paths: [ ] } # don't need to copy files
5+
6+
experimental:
7+
python:
8+
resources:
9+
- "resources:load_resources"
10+
mutators:
11+
- "mutators:update_pipeline"
12+
13+
resources:
14+
pipelines:
15+
my_pipeline_2:
16+
name: "My Pipeline 2"
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from dataclasses import replace
2+
3+
from databricks.bundles.core import pipeline_mutator
4+
from databricks.bundles.pipelines import Pipeline
5+
6+
@pipeline_mutator
7+
def update_pipeline(pipeline: Pipeline) -> Pipeline:
8+
assert isinstance(pipeline.name, str)
9+
10+
return replace(pipeline, name=f"{pipeline.name} ()updated)")
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
2+
>>> uv run [UV_ARGS] -q [CLI] bundle validate --output json
3+
{
4+
"experimental": {
5+
"python": {
6+
"mutators": [
7+
"mutators:update_pipeline"
8+
],
9+
"resources": [
10+
"resources:load_resources"
11+
]
12+
}
13+
},
14+
"resources": {
15+
"pipelines": {
16+
"my_pipeline_1": {
17+
"deployment": {
18+
"kind": "BUNDLE",
19+
"metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/my_project/default/state/metadata.json"
20+
},
21+
"name": "My Pipeline 1",
22+
"permissions": []
23+
},
24+
"my_pipeline_2": {
25+
"deployment": {
26+
"kind": "BUNDLE",
27+
"metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/my_project/default/state/metadata.json"
28+
},
29+
"name": "My Pipeline 2",
30+
"permissions": []
31+
}
32+
}
33+
}
34+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from databricks.bundles.core import Resources
2+
3+
4+
def load_resources() -> Resources:
5+
resources = Resources()
6+
7+
resources.add_pipeline("my_pipeline_1", {"name": "My Pipeline 1"})
8+
9+
return resources
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
UV_ARGS="${UV_ARGS//\[\DATABRICKS_BUNDLES_WHEEL\]/$DATABRICKS_BUNDLES_WHEEL}"
2+
3+
trace uv run $UV_ARGS -q $CLI bundle validate --output json | \
4+
jq "pick(.experimental.python, .resources)"
5+
6+
rm -fr .databricks __pycache__
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
Local = true
2+
Cloud = false # tests don't interact with APIs
3+
4+
[EnvMatrix]
5+
UV_ARGS = [
6+
# only works with the latest version of the wheel
7+
"--with [DATABRICKS_BUNDLES_WHEEL]",
8+
]

acceptance/internal/config.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ type TestConfig struct {
8585
// set to that value (and replacement configured to match the value).
8686
// If there are multiple variables defined, all combinations of tests are created,
8787
// similar to github actions matrix strategy.
88-
EnvMatrix map[string][]string
88+
EnvMatrix map[string]*[]string
8989
}
9090

9191
type ServerStub struct {
@@ -183,7 +183,7 @@ func DoLoadConfig(t *testing.T, path string) TestConfig {
183183
// output: [["KEY=A", "OTHER=VALUE"], ["KEY=B", "OTHER=VALUE"]]
184184
//
185185
// If any entries is an empty list, that variable is dropped from the matrix before processing.
186-
func ExpandEnvMatrix(matrix map[string][]string) [][]string {
186+
func ExpandEnvMatrix(matrix map[string]*[]string) [][]string {
187187
result := [][]string{{}}
188188

189189
if len(matrix) == 0 {
@@ -193,8 +193,8 @@ func ExpandEnvMatrix(matrix map[string][]string) [][]string {
193193
// Filter out keys with empty value slices
194194
filteredMatrix := make(map[string][]string)
195195
for key, values := range matrix {
196-
if len(values) > 0 {
197-
filteredMatrix[key] = values
196+
if len(*values) > 0 {
197+
filteredMatrix[key] = *values
198198
}
199199
}
200200

experimental/python/databricks/bundles/build.py

Lines changed: 61 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,16 @@
77
import sys
88
from copy import deepcopy
99
from dataclasses import dataclass, field, fields, replace
10-
from typing import Callable, Optional, TextIO
10+
from typing import (
11+
Callable,
12+
Optional,
13+
TextIO,
14+
)
1115

1216
from databricks.bundles.core import Bundle, Diagnostics, Location, Resources
1317
from databricks.bundles.core._resource_mutator import ResourceMutator
18+
from databricks.bundles.core._resource_type import _ResourceType
1419
from databricks.bundles.core._transform import _transform
15-
from databricks.bundles.jobs import Job
1620

1721
__all__ = []
1822

@@ -81,21 +85,23 @@ def _load_resources_from_input(input: dict) -> tuple[Resources, Diagnostics]:
8185
diagnostics = Diagnostics()
8286

8387
input_resources = input.get("resources", {})
84-
input_jobs = input_resources.get("jobs", {})
8588

86-
for resource_name, job_dict in input_jobs.items():
87-
try:
88-
job = Job.from_dict(job_dict)
89+
for tpe in _ResourceType.all():
90+
input_resources = input_resources.get(tpe.plural_name, {})
8991

90-
resources.add_job(resource_name, job)
91-
except Exception as exc:
92-
diagnostics = diagnostics.extend(
93-
Diagnostics.from_exception(
94-
exc=exc,
95-
summary="Error while loading job",
96-
path=("resources", "jobs", resource_name),
92+
for resource_name, resource_dict in input_resources.items():
93+
try:
94+
resource = _transform(tpe.resource_type, resource_dict)
95+
96+
resources.add_resource(resource_name=resource_name, resource=resource)
97+
except Exception as exc:
98+
diagnostics = diagnostics.extend(
99+
Diagnostics.from_exception(
100+
exc=exc,
101+
summary="Error while loading {tpe.singular_name}",
102+
path=("resources", tpe.plural_name, resource_name),
103+
)
97104
)
98-
)
99105

100106
return resources, diagnostics
101107

@@ -105,38 +111,56 @@ def _apply_mutators(
105111
resources: Resources,
106112
mutator_functions: list[ResourceMutator],
107113
) -> tuple[Resources, Diagnostics]:
108-
for resource_name, job in resources.jobs.items():
114+
diagnostics = Diagnostics()
115+
116+
for tpe in _ResourceType.all():
117+
resources, diagnostics = diagnostics.extend_tuple(
118+
_apply_mutators_for_type(bundle, resources, tpe, mutator_functions)
119+
)
120+
121+
return resources, diagnostics
122+
123+
124+
def _apply_mutators_for_type(
125+
bundle: Bundle,
126+
resources: Resources,
127+
tpe: _ResourceType,
128+
mutator_functions: list[ResourceMutator],
129+
) -> tuple[Resources, Diagnostics]:
130+
resources_dict = getattr(resources, tpe.plural_name)
131+
132+
for resource_name, resource in resources_dict.items():
109133
for mutator in mutator_functions:
110-
if mutator.resource_type != Job:
134+
if mutator.resource_type != tpe.resource_type:
111135
continue
112136

113137
location = Location.from_callable(mutator.function)
114138

115139
try:
116140
if _get_num_args(mutator.function) == 1:
117-
new_job = mutator.function(job)
141+
new_resource = mutator.function(resource)
118142
else:
119143
# defensive copy so that one function doesn't affect another
120-
new_job = mutator.function(deepcopy(bundle), job)
144+
new_resource = mutator.function(deepcopy(bundle), resource)
121145

122-
# mutating job in-place works, but we can't tell when it happens,
146+
# mutating resource in-place works, but we can't tell when it happens,
123147
# so we only update location if new instance is returned
124148

125-
if new_job is not job:
149+
if new_resource is not resource:
126150
if location:
127151
resources.add_location(
128-
("resources", "jobs", resource_name), location
152+
("resources", tpe.plural_name, resource_name), location
129153
)
130-
resources.jobs[resource_name] = new_job
131-
job = new_job
154+
resources_dict[resource_name] = new_resource
155+
resource = new_resource
132156
except Exception as exc:
133157
mutator_name = mutator.function.__name__
134158

135159
return resources, Diagnostics.from_exception(
136160
exc=exc,
137161
summary=f"Failed to apply '{mutator_name}' mutator",
138162
location=location,
139-
path=("resources", "jobs", resource_name),
163+
path=("resources", tpe.plural_name, resource_name),
140164
)
141165

142166
return resources, Diagnostics()
@@ -219,12 +243,19 @@ def _append_resources(bundle: dict, resources: Resources) -> dict:
219243

220244
new_bundle = bundle.copy()
221245

222-
if resources.jobs:
223-
new_bundle["resources"] = new_bundle.get("resources", {})
224-
new_bundle["resources"]["jobs"] = new_bundle["resources"].get("jobs", {})
246+
for tpe in _ResourceType.all():
247+
resources_dict = getattr(resources, tpe.plural_name)
225248

226-
for resource_name, resource in resources.jobs.items():
227-
new_bundle["resources"]["jobs"][resource_name] = resource.as_dict()
249+
if resources_dict:
250+
new_bundle["resources"] = new_bundle.get("resources", {})
251+
new_bundle["resources"][tpe.plural_name] = new_bundle["resources"].get(
252+
tpe.plural_name, {}
253+
)
254+
255+
for resource_name, resource in resources_dict.items():
256+
new_bundle["resources"][tpe.plural_name][resource_name] = (
257+
resource.as_dict()
258+
)
228259

229260
return new_bundle
230261

@@ -385,7 +416,7 @@ def _load_resource_mutator(
385416

386417
if instance and not isinstance(instance, ResourceMutator):
387418
return None, Diagnostics.create_error(
388-
f"'{name}' in module '{module_name}' is not instance of ResourceMutator, did you decorate it with @job_mutator?",
419+
f"'{name}' in module '{module_name}' is not instance of ResourceMutator, did you decorate it with @<resource_type>_mutator?",
389420
)
390421

391422
return instance, diagnostics

experimental/python/databricks/bundles/core/__init__.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
"VariableOrList",
1414
"VariableOrOptional",
1515
"job_mutator",
16+
"pipeline_mutator",
1617
"load_resources_from_current_package_module",
1718
"load_resources_from_module",
1819
"load_resources_from_modules",
@@ -34,7 +35,11 @@
3435
)
3536
from databricks.bundles.core._location import Location
3637
from databricks.bundles.core._resource import Resource
37-
from databricks.bundles.core._resource_mutator import ResourceMutator, job_mutator
38+
from databricks.bundles.core._resource_mutator import (
39+
ResourceMutator,
40+
job_mutator,
41+
pipeline_mutator,
42+
)
3843
from databricks.bundles.core._resources import Resources
3944
from databricks.bundles.core._variable import (
4045
Variable,

experimental/python/databricks/bundles/core/_resource_mutator.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
if TYPE_CHECKING:
99
from databricks.bundles.jobs._models.job import Job
10+
from databricks.bundles.pipelines._models.pipeline import Pipeline
1011

1112
_T = TypeVar("_T", bound=Resource)
1213

@@ -94,3 +95,35 @@ def my_job_mutator(bundle: Bundle, job: Job) -> Job:
9495
from databricks.bundles.jobs._models.job import Job
9596

9697
return ResourceMutator(resource_type=Job, function=function)
98+
99+
100+
@overload
101+
def pipeline_mutator(
102+
function: Callable[[Bundle, "Pipeline"], "Pipeline"],
103+
) -> ResourceMutator["Pipeline"]: ...
104+
105+
106+
@overload
107+
def pipeline_mutator(
108+
function: Callable[["Pipeline"], "Pipeline"],
109+
) -> ResourceMutator["Pipeline"]: ...
110+
111+
112+
def pipeline_mutator(function: Callable) -> ResourceMutator["Pipeline"]:
113+
"""
114+
Decorator for defining a pipeline mutator. Function should return a new instance of the pipeline with the desired changes,
115+
instead of mutating the input pipeline.
116+
117+
Example:
118+
119+
.. code-block:: python
120+
121+
@pipeline_mutator
122+
def my_pipeline_mutator(bundle: Bundle, pipeline: Pipeline) -> Pipeline:
123+
return replace(pipeline, name="my_job")
124+
125+
:param function: Function that mutates a pipeline.
126+
"""
127+
from databricks.bundles.pipelines._models.pipeline import Pipeline
128+
129+
return ResourceMutator(resource_type=Pipeline, function=function)

0 commit comments

Comments
 (0)