Skip to content

Commit 13cb417

Browse files
author
Jesse
authored
[PECO-840] Port staging ingestion behaviour to new UC Volumes (#235)
Signed-off-by: Jesse Whitehouse <jesse.whitehouse@databricks.com>
1 parent 241e934 commit 13cb417

File tree

4 files changed

+252
-6
lines changed

4 files changed

+252
-6
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## 2.9.4 (Unreleased)
44

55
- Other: Introduce SQLAlchemy dialect compliance test suite and enumerate all excluded tests
6+
- Add integration tests for Databricks UC Volumes ingestion queries
67

78
## 2.9.3 (2023-08-24)
89

tests/e2e/common/staging_ingestion_tests.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ def test_staging_ingestion_life_cycle(self):
7676

7777
# GET after REMOVE should fail
7878

79-
with pytest.raises(Error):
79+
with pytest.raises(Error, match="Staging operation over HTTP was unsuccessful: 404"):
8080
cursor = conn.cursor()
8181
query = f"GET 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' TO '{new_temp_path}'"
8282
cursor.execute(query)
@@ -97,7 +97,7 @@ def test_staging_ingestion_put_fails_without_staging_allowed_local_path(self):
9797
with open(fh, "wb") as fp:
9898
fp.write(original_text)
9999

100-
with pytest.raises(Error):
100+
with pytest.raises(Error, match="You must provide at least one staging_allowed_local_path"):
101101
with self.connection() as conn:
102102
cursor = conn.cursor()
103103
query = f"PUT '{temp_path}' INTO 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' OVERWRITE"
@@ -118,7 +118,7 @@ def test_staging_ingestion_put_fails_if_localFile_not_in_staging_allowed_local_p
118118
# Add junk to base_path
119119
base_path = os.path.join(base_path, "temp")
120120

121-
with pytest.raises(Error):
121+
with pytest.raises(Error, match="Local file operations are restricted to paths within the configured staging_allowed_local_path"):
122122
with self.connection(extra_params={"staging_allowed_local_path": base_path}) as conn:
123123
cursor = conn.cursor()
124124
query = f"PUT '{temp_path}' INTO 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' OVERWRITE"
@@ -221,7 +221,7 @@ def test_staging_ingestion_put_fails_if_absolute_localFile_not_in_staging_allowe
221221
staging_allowed_local_path = "/var/www/html"
222222
target_file = "/var/www/html/../html1/not_allowed.html"
223223

224-
with pytest.raises(Error):
224+
with pytest.raises(Error, match="Local file operations are restricted to paths within the configured staging_allowed_local_path"):
225225
with self.connection(extra_params={"staging_allowed_local_path": staging_allowed_local_path}) as conn:
226226
cursor = conn.cursor()
227227
query = f"PUT '{target_file}' INTO 'stage://tmp/{self.staging_ingestion_user}/tmp/11/15/file1.csv' OVERWRITE"
Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
import os
2+
import tempfile
3+
4+
import pytest
5+
import databricks.sql as sql
6+
from databricks.sql import Error
7+
8+
@pytest.fixture(scope="module", autouse=True)
9+
def check_catalog_and_schema():
10+
"""This fixture verifies that a catalog and schema are present in the environment.
11+
The fixture only evaluates when the test _isn't skipped_.
12+
"""
13+
14+
_catalog = os.getenv("catalog")
15+
_schema = os.getenv("schema")
16+
17+
if _catalog is None or _schema is None:
18+
raise ValueError(
19+
f"UC Volume tests require values for the `catalog` and `schema` environment variables. Found catalog {_catalog} schema {_schema}"
20+
)
21+
22+
class PySQLUCVolumeTestSuiteMixin:
23+
"""Simple namespace for UC Volume tests.
24+
25+
In addition to connection credentials (host, path, token) this suite requires env vars
26+
named catalog and schema"""
27+
28+
catalog, schema = os.getenv("catalog"), os.getenv("schema")
29+
30+
31+
def test_uc_volume_life_cycle(self):
32+
"""PUT a file into the UC Volume
33+
GET the file from the UC Volume
34+
REMOVE the file from the UC Volume
35+
Try to GET the file again expecting to raise an exception
36+
"""
37+
38+
# PUT should succeed
39+
40+
fh, temp_path = tempfile.mkstemp()
41+
42+
original_text = "hello world!".encode("utf-8")
43+
44+
with open(fh, "wb") as fp:
45+
fp.write(original_text)
46+
47+
with self.connection(extra_params={"staging_allowed_local_path": temp_path}) as conn:
48+
49+
cursor = conn.cursor()
50+
query = f"PUT '{temp_path}' INTO '/Volumes/{self.catalog}/{self.schema}/e2etests/file1.csv' OVERWRITE"
51+
cursor.execute(query)
52+
53+
# GET should succeed
54+
55+
new_fh, new_temp_path = tempfile.mkstemp()
56+
57+
with self.connection(extra_params={"staging_allowed_local_path": new_temp_path}) as conn:
58+
cursor = conn.cursor()
59+
query = f"GET '/Volumes/{self.catalog}/{self.schema}/e2etests/file1.csv' TO '{new_temp_path}'"
60+
cursor.execute(query)
61+
62+
with open(new_fh, "rb") as fp:
63+
fetched_text = fp.read()
64+
65+
assert fetched_text == original_text
66+
67+
# REMOVE should succeed
68+
69+
remove_query = (
70+
f"REMOVE '/Volumes/{self.catalog}/{self.schema}/e2etests/file1.csv'"
71+
)
72+
73+
with self.connection(extra_params={"staging_allowed_local_path": "/"}) as conn:
74+
cursor = conn.cursor()
75+
cursor.execute(remove_query)
76+
77+
# GET after REMOVE should fail
78+
79+
with pytest.raises(Error, match="Staging operation over HTTP was unsuccessful: 404"):
80+
cursor = conn.cursor()
81+
query = f"GET '/Volumes/{self.catalog}/{self.schema}/e2etests/file1.csv' TO '{new_temp_path}'"
82+
cursor.execute(query)
83+
84+
os.remove(temp_path)
85+
os.remove(new_temp_path)
86+
87+
88+
def test_uc_volume_put_fails_without_staging_allowed_local_path(self):
89+
"""PUT operations are not supported unless the connection was built with
90+
a parameter called staging_allowed_local_path
91+
"""
92+
93+
fh, temp_path = tempfile.mkstemp()
94+
95+
original_text = "hello world!".encode("utf-8")
96+
97+
with open(fh, "wb") as fp:
98+
fp.write(original_text)
99+
100+
with pytest.raises(Error, match="You must provide at least one staging_allowed_local_path"):
101+
with self.connection() as conn:
102+
cursor = conn.cursor()
103+
query = f"PUT '{temp_path}' INTO '/Volumes/{self.catalog}/{self.schema}/e2etests/file1.csv' OVERWRITE"
104+
cursor.execute(query)
105+
106+
def test_uc_volume_put_fails_if_localFile_not_in_staging_allowed_local_path(self):
107+
108+
109+
fh, temp_path = tempfile.mkstemp()
110+
111+
original_text = "hello world!".encode("utf-8")
112+
113+
with open(fh, "wb") as fp:
114+
fp.write(original_text)
115+
116+
base_path, filename = os.path.split(temp_path)
117+
118+
# Add junk to base_path
119+
base_path = os.path.join(base_path, "temp")
120+
121+
with pytest.raises(Error, match="Local file operations are restricted to paths within the configured staging_allowed_local_path"):
122+
with self.connection(extra_params={"staging_allowed_local_path": base_path}) as conn:
123+
cursor = conn.cursor()
124+
query = f"PUT '{temp_path}' INTO '/Volumes/{self.catalog}/{self.schema}/e2etests/file1.csv' OVERWRITE"
125+
cursor.execute(query)
126+
127+
def test_uc_volume_put_fails_if_file_exists_and_overwrite_not_set(self):
128+
"""PUT a file into the staging location twice. First command should succeed. Second should fail.
129+
"""
130+
131+
132+
fh, temp_path = tempfile.mkstemp()
133+
134+
original_text = "hello world!".encode("utf-8")
135+
136+
with open(fh, "wb") as fp:
137+
fp.write(original_text)
138+
139+
def perform_put():
140+
with self.connection(extra_params={"staging_allowed_local_path": temp_path}) as conn:
141+
cursor = conn.cursor()
142+
query = f"PUT '{temp_path}' INTO '/Volumes/{self.catalog}/{self.schema}/e2etests/file1.csv'"
143+
cursor.execute(query)
144+
145+
def perform_remove():
146+
remove_query = (
147+
f"REMOVE '/Volumes/{self.catalog}/{self.schema}/e2etests/file1.csv'"
148+
)
149+
150+
with self.connection(extra_params={"staging_allowed_local_path": "/"}) as conn:
151+
cursor = conn.cursor()
152+
cursor.execute(remove_query)
153+
154+
155+
# Make sure file does not exist
156+
perform_remove()
157+
158+
# Put the file
159+
perform_put()
160+
161+
# Try to put it again
162+
with pytest.raises(sql.exc.ServerOperationError, match="FILE_IN_STAGING_PATH_ALREADY_EXISTS"):
163+
perform_put()
164+
165+
# Clean up after ourselves
166+
perform_remove()
167+
168+
169+
def test_uc_volume_put_fails_if_absolute_localFile_not_in_staging_allowed_local_path(self):
170+
"""
171+
This test confirms that staging_allowed_local_path and target_file are resolved into absolute paths.
172+
"""
173+
174+
# If these two paths are not resolved absolutely, they appear to share a common path of /var/www/html
175+
# after resolution their common path is only /var/www which should raise an exception
176+
# Because the common path must always be equal to staging_allowed_local_path
177+
staging_allowed_local_path = "/var/www/html"
178+
target_file = "/var/www/html/../html1/not_allowed.html"
179+
180+
with pytest.raises(Error, match="Local file operations are restricted to paths within the configured staging_allowed_local_path"):
181+
with self.connection(extra_params={"staging_allowed_local_path": staging_allowed_local_path}) as conn:
182+
cursor = conn.cursor()
183+
query = f"PUT '{target_file}' INTO '/Volumes/{self.catalog}/{self.schema}/e2etests/file1.csv' OVERWRITE"
184+
cursor.execute(query)
185+
186+
def test_uc_volume_empty_local_path_fails_to_parse_at_server(self):
187+
staging_allowed_local_path = "/var/www/html"
188+
target_file = ""
189+
190+
with pytest.raises(Error, match="EMPTY_LOCAL_FILE_IN_STAGING_ACCESS_QUERY"):
191+
with self.connection(extra_params={"staging_allowed_local_path": staging_allowed_local_path}) as conn:
192+
cursor = conn.cursor()
193+
query = f"PUT '{target_file}' INTO '/Volumes/{self.catalog}/{self.schema}/e2etests/file1.csv' OVERWRITE"
194+
cursor.execute(query)
195+
196+
def test_uc_volume_invalid_volume_path_fails_at_server(self):
197+
staging_allowed_local_path = "/var/www/html"
198+
target_file = "index.html"
199+
200+
with pytest.raises(Error, match="NOT_FOUND: CATALOG"):
201+
with self.connection(extra_params={"staging_allowed_local_path": staging_allowed_local_path}) as conn:
202+
cursor = conn.cursor()
203+
query = f"PUT '{target_file}' INTO '/Volumes/RANDOMSTRINGOFCHARACTERS/{self.catalog}/{self.schema}/e2etests/file1.csv' OVERWRITE"
204+
cursor.execute(query)
205+
206+
def test_uc_volume_supports_multiple_staging_allowed_local_path_values(self):
207+
"""staging_allowed_local_path may be either a path-like object or a list of path-like objects.
208+
209+
This test confirms that two configured base paths:
210+
1 - doesn't raise an exception
211+
2 - allows uploads from both paths
212+
3 - doesn't allow uploads from a third path
213+
"""
214+
215+
def generate_file_and_path_and_queries():
216+
"""
217+
1. Makes a temp file with some contents.
218+
2. Write a query to PUT it into a staging location
219+
3. Write a query to REMOVE it from that location (for cleanup)
220+
"""
221+
fh, temp_path = tempfile.mkstemp()
222+
with open(fh, "wb") as fp:
223+
original_text = "hello world!".encode("utf-8")
224+
fp.write(original_text)
225+
put_query = f"PUT '{temp_path}' INTO '/Volumes/{self.catalog}/{self.schema}/e2etests/{id(temp_path)}.csv' OVERWRITE"
226+
remove_query = f"REMOVE '/Volumes/{self.catalog}/{self.schema}/e2etests/{id(temp_path)}.csv'"
227+
return fh, temp_path, put_query, remove_query
228+
229+
fh1, temp_path1, put_query1, remove_query1 = generate_file_and_path_and_queries()
230+
fh2, temp_path2, put_query2, remove_query2 = generate_file_and_path_and_queries()
231+
fh3, temp_path3, put_query3, remove_query3 = generate_file_and_path_and_queries()
232+
233+
with self.connection(extra_params={"staging_allowed_local_path": [temp_path1, temp_path2]}) as conn:
234+
cursor = conn.cursor()
235+
236+
cursor.execute(put_query1)
237+
cursor.execute(put_query2)
238+
239+
with pytest.raises(Error, match="Local file operations are restricted to paths within the configured staging_allowed_local_path"):
240+
cursor.execute(put_query3)
241+
242+
# Then clean up the files we made
243+
cursor.execute(remove_query1)
244+
cursor.execute(remove_query2)

tests/e2e/test_driver.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from tests.e2e.common.staging_ingestion_tests import PySQLStagingIngestionTestSuiteMixin
3030
from tests.e2e.common.retry_test_mixins import PySQLRetryTestsMixin
3131
from tests.e2e.common.parameterized_query_tests import PySQLParameterizedQueryTestSuiteMixin
32+
from tests.e2e.common.uc_volume_tests import PySQLUCVolumeTestSuiteMixin
3233

3334
log = logging.getLogger(__name__)
3435

@@ -143,7 +144,7 @@ def test_cloud_fetch(self):
143144
# Exclude Retry tests because they require specific setups, and LargeQueries too slow for core
144145
# tests
145146
class PySQLCoreTestSuite(SmokeTestMixin, CoreTestMixin, DecimalTestsMixin, TimestampTestsMixin,
146-
PySQLTestCase, PySQLStagingIngestionTestSuiteMixin, PySQLRetryTestsMixin, PySQLParameterizedQueryTestSuiteMixin):
147+
PySQLTestCase, PySQLStagingIngestionTestSuiteMixin, PySQLRetryTestsMixin, PySQLParameterizedQueryTestSuiteMixin, PySQLUCVolumeTestSuiteMixin):
147148
validate_row_value_type = True
148149
validate_result = True
149150

@@ -709,4 +710,4 @@ def main(cli_args):
709710

710711

711712
if __name__ == "__main__":
712-
main(sys.argv[1:])
713+
main(sys.argv[1:])

0 commit comments

Comments
 (0)