Skip to content

Commit 8978634

Browse files
authored
acc: Improve testserver pipelines; add update test local+cloud (#2937)
## Changes - Update testserver pipeline to populate fields same way as real server does it. - Add new pipeline server that exercises Create and Update. - Add new script read_id.py to get id from the state. ## Why Using this test in #2926 ## Tests New acceptance local + integration test.
1 parent 5f30b6a commit 8978634

File tree

8 files changed

+178
-28
lines changed

8 files changed

+178
-28
lines changed

acceptance/bin/read_id.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Print selected attributes from terraform state.
4+
5+
Usage: <section> <name> [attr...]
6+
"""
7+
8+
import sys
9+
import os
10+
import json
11+
12+
13+
def print_resource_terraform(section, name):
14+
resource_type = "databricks_" + section[:-1]
15+
filename = ".databricks/bundle/default/terraform/terraform.tfstate"
16+
raw = open(filename).read()
17+
data = json.loads(raw)
18+
found = 0
19+
for r in data["resources"]:
20+
r_type = r["type"]
21+
r_name = r["name"]
22+
if r_type != resource_type:
23+
continue
24+
if r_name != name:
25+
continue
26+
for inst in r["instances"]:
27+
attribute_values = inst.get("attributes") or {}
28+
print(attribute_values.get("id"))
29+
return
30+
31+
32+
print_resource_terraform(*sys.argv[1:])
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
bundle:
2+
name: acc-$UNIQUE_NAME
3+
4+
resources:
5+
pipelines:
6+
my:
7+
name: test-pipeline-$UNIQUE_NAME
8+
libraries:
9+
- file:
10+
path: "./foo.py"
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
2+
>>> [CLI] bundle deploy
3+
Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/acc-[UNIQUE_NAME]/default/files...
4+
Deploying resources...
5+
Updating deployment state...
6+
Deployment complete!
7+
8+
>>> print_requests
9+
{
10+
"body": {
11+
"channel": "CURRENT",
12+
"deployment": {
13+
"kind": "BUNDLE",
14+
"metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/acc-[UNIQUE_NAME]/default/state/metadata.json"
15+
},
16+
"edition": "ADVANCED",
17+
"libraries": [
18+
{
19+
"file": {
20+
"path": "/Workspace/Users/[USERNAME]/.bundle/acc-[UNIQUE_NAME]/default/files/foo.py"
21+
}
22+
}
23+
],
24+
"name": "test-pipeline-[UNIQUE_NAME]"
25+
},
26+
"method": "POST",
27+
"path": "/api/2.0/pipelines"
28+
}
29+
pipelines my id='[UUID]' name='test-pipeline-[UNIQUE_NAME]'
30+
31+
>>> update_file.py databricks.yml foo.py bar.py
32+
33+
>>> [CLI] bundle deploy
34+
Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/acc-[UNIQUE_NAME]/default/files...
35+
Deploying resources...
36+
Updating deployment state...
37+
Deployment complete!
38+
39+
>>> print_requests
40+
{
41+
"body": {
42+
"channel": "CURRENT",
43+
"deployment": {
44+
"kind": "BUNDLE",
45+
"metadata_file_path": "/Workspace/Users/[USERNAME]/.bundle/acc-[UNIQUE_NAME]/default/state/metadata.json"
46+
},
47+
"edition": "ADVANCED",
48+
"id": "[UUID]",
49+
"libraries": [
50+
{
51+
"file": {
52+
"path": "/Workspace/Users/[USERNAME]/.bundle/acc-[UNIQUE_NAME]/default/files/bar.py"
53+
}
54+
}
55+
],
56+
"name": "test-pipeline-[UNIQUE_NAME]",
57+
"storage": "dbfs:/pipelines/[UUID]"
58+
},
59+
"method": "PUT",
60+
"path": "/api/2.0/pipelines/[UUID]"
61+
}
62+
pipelines my id='[UUID]' name='test-pipeline-[UNIQUE_NAME]'
63+
64+
>>> [CLI] pipelines get [UUID]
65+
{
66+
"creator_user_name":"[USERNAME]",
67+
"last_modified":[TIMESTAMP_MS],
68+
"name":"test-pipeline-[UNIQUE_NAME]",
69+
"pipeline_id":"[UUID]",
70+
"run_as_user_name":"[USERNAME]",
71+
"spec": {
72+
"channel":"CURRENT",
73+
"deployment": {
74+
"kind":"BUNDLE",
75+
"metadata_file_path":"/Workspace/Users/[USERNAME]/.bundle/acc-[UNIQUE_NAME]/default/state/metadata.json"
76+
},
77+
"edition":"ADVANCED",
78+
"id":"[UUID]",
79+
"libraries": [
80+
{
81+
"file": {
82+
"path":"/Workspace/Users/[USERNAME]/.bundle/acc-[UNIQUE_NAME]/default/files/bar.py"
83+
}
84+
}
85+
],
86+
"name":"test-pipeline-[UNIQUE_NAME]",
87+
"storage":"dbfs:/pipelines/[UUID]"
88+
},
89+
"state":"IDLE"
90+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
envsubst < databricks.yml.tmpl > databricks.yml
2+
touch foo.py
3+
touch bar.py
4+
trace $CLI bundle deploy
5+
6+
print_requests() {
7+
jq --sort-keys 'select(.method != "GET" and (.path | contains("/pipelines")))' < out.requests.txt
8+
rm out.requests.txt
9+
read_state.py pipelines my id name
10+
}
11+
12+
trace print_requests
13+
14+
trace update_file.py databricks.yml foo.py bar.py
15+
trace $CLI bundle deploy
16+
trace print_requests
17+
18+
trace $CLI pipelines get $(read_id.py pipelines my)
19+
rm out.requests.txt
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
Cloud = true
2+
Ignore = ["bar.py", "foo.py"]
3+
4+
[[Repls]]
5+
Old = '174\d{10}'
6+
New = '[TIMESTAMP_MS]'

acceptance/internal/handlers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ func addDefaultHandlers(server *testserver.Server) {
255255
// Pipelines:
256256

257257
server.Handle("GET", "/api/2.0/pipelines/{pipeline_id}", func(req testserver.Request) any {
258-
return req.Workspace.PipelineGet(req.Vars["pipeline_id"])
258+
return testserver.MapGet(req.Workspace, req.Workspace.Pipelines, req.Vars["pipeline_id"])
259259
})
260260

261261
server.Handle("POST", "/api/2.0/pipelines", func(req testserver.Request) any {

libs/testserver/fake_workspace.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ type FakeWorkspace struct {
4040
Jobs map[int64]jobs.Job
4141
JobRuns map[int64]jobs.Run
4242

43-
Pipelines map[string]pipelines.PipelineSpec
43+
Pipelines map[string]pipelines.GetPipelineResponse
4444
Monitors map[string]catalog.MonitorInfo
4545
Apps map[string]apps.App
4646
Schemas map[string]catalog.SchemaInfo
@@ -112,7 +112,7 @@ func NewFakeWorkspace(url string) *FakeWorkspace {
112112
JobRuns: map[int64]jobs.Run{},
113113
nextJobId: TestJobID,
114114
nextJobRunId: TestRunID,
115-
Pipelines: map[string]pipelines.PipelineSpec{},
115+
Pipelines: map[string]pipelines.GetPipelineResponse{},
116116
Monitors: map[string]catalog.MonitorInfo{},
117117
Apps: map[string]apps.App{},
118118
Schemas: map[string]catalog.SchemaInfo{},

libs/testserver/pipelines.go

Lines changed: 18 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,49 +3,41 @@ package testserver
33
import (
44
"encoding/json"
55
"fmt"
6+
"time"
67

78
"github.com/databricks/databricks-sdk-go/service/pipelines"
89
"github.com/google/uuid"
910
)
1011

11-
func (s *FakeWorkspace) PipelineGet(pipelineId string) Response {
12-
defer s.LockUnlock()()
13-
14-
spec, ok := s.Pipelines[pipelineId]
15-
if !ok {
16-
return Response{
17-
StatusCode: 404,
18-
}
19-
}
20-
21-
return Response{
22-
Body: pipelines.GetPipelineResponse{
23-
PipelineId: pipelineId,
24-
Spec: &spec,
25-
},
26-
}
27-
}
28-
2912
func (s *FakeWorkspace) PipelineCreate(req Request) Response {
3013
defer s.LockUnlock()()
3114

32-
var r pipelines.PipelineSpec
33-
err := json.Unmarshal(req.Body, &r)
15+
spec := pipelines.PipelineSpec{}
16+
err := json.Unmarshal(req.Body, &spec)
3417
if err != nil {
3518
return Response{
3619
Body: fmt.Sprintf("cannot unmarshal request body: %s", err),
3720
StatusCode: 400,
3821
}
3922
}
4023

24+
var r pipelines.GetPipelineResponse
25+
r.Spec = &spec
26+
4127
pipelineId := uuid.New().String()
42-
r.Id = pipelineId
28+
spec.Id = pipelineId
29+
r.PipelineId = pipelineId
30+
r.CreatorUserName = "tester@databricks.com"
31+
r.LastModified = time.Now().UnixMilli()
32+
r.Name = r.Spec.Name
33+
r.RunAsUserName = "tester@databricks.com"
34+
r.State = "IDLE"
4335

4436
// If the pipeline definition does not specify a catalog, it switches to Hive metastore mode
4537
// and if the storage location is not specified, API automatically generates a storage location
4638
// (ref: https://docs.databricks.com/gcp/en/dlt/hive-metastore#specify-a-storage-location)
47-
if r.Storage == "" && r.Catalog == "" {
48-
r.Storage = "dbfs:/pipelines/" + pipelineId
39+
if spec.Storage == "" && spec.Catalog == "" {
40+
spec.Storage = "dbfs:/pipelines/" + pipelineId
4941
}
5042
s.Pipelines[pipelineId] = r
5143

@@ -68,14 +60,15 @@ func (s *FakeWorkspace) PipelineUpdate(req Request, pipelineId string) Response
6860
}
6961
}
7062

71-
_, exists := s.Pipelines[pipelineId]
63+
item, exists := s.Pipelines[pipelineId]
7264
if !exists {
7365
return Response{
7466
StatusCode: 404,
7567
}
7668
}
7769

78-
s.Pipelines[pipelineId] = request
70+
item.Spec = &request
71+
s.Pipelines[pipelineId] = item
7972

8073
return Response{
8174
Body: pipelines.EditPipelineResponse{},

0 commit comments

Comments
 (0)