Skip to content

Commit 79d2197

Browse files
committed
Base changes
1 parent 2ab9a5f commit 79d2197

File tree

7 files changed

+102
-21
lines changed

7 files changed

+102
-21
lines changed

CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,16 @@
55
- Split the connector into two separate packages: `databricks-sql-connector` and `databricks-sqlalchemy`. The `databricks-sql-connector` package contains the core functionality of the connector, while the `databricks-sqlalchemy` package contains the SQLAlchemy dialect for the connector.
66
- Pyarrow dependency is now optional in `databricks-sql-connector`. Users needing arrow are supposed to explicitly install pyarrow
77

8+
# 3.7.3 (2025-03-28)
9+
10+
- Fix: Unable to poll small results in execute_async function (databricks/databricks-sql-python#515 by @jprakash-db)
11+
- Updated log messages to show the status code and error messages of requests (databricks/databricks-sql-python#511 by @jprakash-db)
12+
- Fix: Incorrect metadata was fetched in case of queries with the same alias (databricks/databricks-sql-python#505 by @jprakash-db)
13+
14+
# 3.7.2 (2025-01-31)
15+
16+
- Updated the retry_dela_max and retry_timeout (databricks/databricks-sql-python#497 by @jprakash-db)
17+
818
# 3.7.1 (2025-01-07)
919

1020
- Relaxed the number of Http retry attempts (databricks/databricks-sql-python#486 by @jprakash-db)

src/databricks/sql/auth/retry.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -290,8 +290,9 @@ def sleep_for_retry(self, response: BaseHTTPResponse) -> bool:
290290
else:
291291
proposed_wait = self.get_backoff_time()
292292

293-
proposed_wait = min(proposed_wait, self.delay_max)
293+
proposed_wait = max(proposed_wait, self.delay_max)
294294
self.check_proposed_wait(proposed_wait)
295+
logger.debug(f"Retrying after {proposed_wait} seconds")
295296
time.sleep(proposed_wait)
296297
return True
297298

@@ -344,23 +345,24 @@ def should_retry(self, method: str, status_code: int) -> Tuple[bool, str]:
344345
if a retry would violate the configured policy.
345346
"""
346347

348+
logger.info(f"Received status code {status_code} for {method} request")
349+
347350
# Request succeeded. Don't retry.
348351
if status_code == 200:
349352
return False, "200 codes are not retried"
350353

351354
if status_code == 401:
352-
raise NonRecoverableNetworkError(
353-
"Received 401 - UNAUTHORIZED. Confirm your authentication credentials."
355+
return (
356+
False,
357+
"Received 401 - UNAUTHORIZED. Confirm your authentication credentials.",
354358
)
355359

356360
if status_code == 403:
357-
raise NonRecoverableNetworkError(
358-
"Received 403 - FORBIDDEN. Confirm your authentication credentials."
359-
)
361+
return False, "403 codes are not retried"
360362

361363
# Request failed and server said NotImplemented. This isn't recoverable. Don't retry.
362364
if status_code == 501:
363-
raise NonRecoverableNetworkError("Received code 501 from server.")
365+
return False, "Received code 501 from server."
364366

365367
# Request failed and this method is not retryable. We only retry POST requests.
366368
if not self._is_method_retryable(method):
@@ -399,8 +401,9 @@ def should_retry(self, method: str, status_code: int) -> Tuple[bool, str]:
399401
and status_code not in self.status_forcelist
400402
and status_code not in self.force_dangerous_codes
401403
):
402-
raise UnsafeToRetryError(
403-
"ExecuteStatement command can only be retried for codes 429 and 503. Received code: {status_code}"
404+
return (
405+
False,
406+
"ExecuteStatement command can only be retried for codes 429 and 503",
404407
)
405408

406409
# Request failed with a dangerous code, was an ExecuteStatement, but user forced retries for this

src/databricks/sql/auth/thrift_http_client.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,12 @@ def flush(self):
198198
self.message = self.__resp.reason
199199
self.headers = self.__resp.headers
200200

201+
logger.info(
202+
"HTTP Response with status code {}, message: {}".format(
203+
self.code, self.message
204+
)
205+
)
206+
201207
@staticmethod
202208
def basic_proxy_auth_headers(proxy):
203209
if proxy is None or not proxy.username:

src/databricks/sql/client.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -744,6 +744,7 @@ def execute(
744744
self,
745745
operation: str,
746746
parameters: Optional[TParameterCollection] = None,
747+
enforce_embedded_schema_correctness=False
747748
) -> "Cursor":
748749
"""
749750
Execute a query and wait for execution to complete.
@@ -808,6 +809,7 @@ def execute(
808809
use_cloud_fetch=self.connection.use_cloud_fetch,
809810
parameters=prepared_params,
810811
async_op=False,
812+
enforce_embedded_schema_correctness=enforce_embedded_schema_correctness
811813
)
812814
self.active_result_set = ResultSet(
813815
self.connection,
@@ -829,6 +831,7 @@ def execute_async(
829831
self,
830832
operation: str,
831833
parameters: Optional[TParameterCollection] = None,
834+
enforce_embedded_schema_correctness=False
832835
) -> "Cursor":
833836
"""
834837
@@ -869,6 +872,7 @@ def execute_async(
869872
use_cloud_fetch=self.connection.use_cloud_fetch,
870873
parameters=prepared_params,
871874
async_op=True,
875+
enforce_embedded_schema_correctness=enforce_embedded_schema_correctness
872876
)
873877

874878
return self

src/databricks/sql/thrift_backend.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
# - 900s attempts-duration lines up w ODBC/JDBC drivers (for cluster startup > 10 mins)
6767
_retry_policy = { # (type, default, min, max)
6868
"_retry_delay_min": (float, 1, 0.1, 60),
69-
"_retry_delay_max": (float, 30, 5, 3600),
69+
"_retry_delay_max": (float, 60, 5, 3600),
7070
"_retry_stop_after_attempts_count": (int, 30, 1, 60),
7171
"_retry_stop_after_attempts_duration": (float, 900, 1, 86400),
7272
"_retry_delay_default": (float, 5, 1, 60),
@@ -883,6 +883,7 @@ def execute_command(
883883
use_cloud_fetch=True,
884884
parameters=[],
885885
async_op=False,
886+
enforce_embedded_schema_correctness=False
886887
):
887888
assert session_handle is not None
888889

@@ -898,8 +899,12 @@ def execute_command(
898899
sessionHandle=session_handle,
899900
statement=operation,
900901
runAsync=True,
901-
getDirectResults=ttypes.TSparkGetDirectResults(
902-
maxRows=max_rows, maxBytes=max_bytes
902+
# For async operation we don't want the direct results
903+
getDirectResults=None
904+
if async_op
905+
else ttypes.TSparkGetDirectResults(
906+
maxRows=max_rows,
907+
maxBytes=max_bytes,
903908
),
904909
canReadArrowResult=True if pyarrow else False,
905910
canDecompressLZ4Result=lz4_compression,
@@ -910,6 +915,7 @@ def execute_command(
910915
},
911916
useArrowNativeTypes=spark_arrow_types,
912917
parameters=parameters,
918+
enforceEmbeddedSchemaCorrectness=enforce_embedded_schema_correctness
913919
)
914920
resp = self.make_request(self._client.ExecuteStatement, req)
915921

tests/e2e/common/large_queries_mixin.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ def test_long_running_query(self):
9494
scale_factor = 1
9595
with self.cursor() as cursor:
9696
while duration < min_duration:
97-
assert scale_factor < 512, "Detected infinite loop"
97+
assert scale_factor < 1024, "Detected infinite loop"
9898
start = time.time()
9999

100100
cursor.execute(

tests/e2e/test_driver.py

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -177,19 +177,22 @@ def test_cloud_fetch(self):
177177
for i in range(len(cf_result)):
178178
assert cf_result[i] == noop_result[i]
179179

180-
def test_execute_async(self):
181-
def isExecuting(operation_state):
182-
return not operation_state or operation_state in [
183-
ttypes.TOperationState.RUNNING_STATE,
184-
ttypes.TOperationState.PENDING_STATE,
185-
]
180+
181+
class TestPySQLAsyncQueriesSuite(PySQLPytestTestCase):
182+
def isExecuting(self, operation_state):
183+
return not operation_state or operation_state in [
184+
ttypes.TOperationState.RUNNING_STATE,
185+
ttypes.TOperationState.PENDING_STATE,
186+
]
187+
188+
def test_execute_async__long_running(self):
186189

187190
long_running_query = "SELECT COUNT(*) FROM RANGE(10000 * 16) x JOIN RANGE(10000) y ON FROM_UNIXTIME(x.id * y.id, 'yyyy-MM-dd') LIKE '%not%a%date%'"
188191
with self.cursor() as cursor:
189192
cursor.execute_async(long_running_query)
190193

191194
## Polling after every POLLING_INTERVAL seconds
192-
while isExecuting(cursor.get_query_state()):
195+
while self.isExecuting(cursor.get_query_state()):
193196
time.sleep(self.POLLING_INTERVAL)
194197
log.info("Polling the status in test_execute_async")
195198

@@ -198,6 +201,55 @@ def isExecuting(operation_state):
198201

199202
assert result[0].asDict() == {"count(1)": 0}
200203

204+
def test_execute_async__small_result(self):
205+
small_result_query = "SELECT 1"
206+
207+
with self.cursor() as cursor:
208+
cursor.execute_async(small_result_query)
209+
210+
## Fake sleep for 5 secs
211+
time.sleep(5)
212+
213+
## Polling after every POLLING_INTERVAL seconds
214+
while self.isExecuting(cursor.get_query_state()):
215+
time.sleep(self.POLLING_INTERVAL)
216+
log.info("Polling the status in test_execute_async")
217+
218+
cursor.get_async_execution_result()
219+
result = cursor.fetchall()
220+
221+
assert result[0].asDict() == {"1": 1}
222+
223+
def test_execute_async__large_result(self):
224+
x_dimension = 1000
225+
y_dimension = 1000
226+
large_result_query = f"""
227+
SELECT
228+
x.id AS x_id,
229+
y.id AS y_id,
230+
FROM_UNIXTIME(x.id * y.id, 'yyyy-MM-dd') AS date
231+
FROM
232+
RANGE({x_dimension}) x
233+
JOIN
234+
RANGE({y_dimension}) y
235+
"""
236+
237+
with self.cursor() as cursor:
238+
cursor.execute_async(large_result_query)
239+
240+
## Fake sleep for 5 secs
241+
time.sleep(5)
242+
243+
## Polling after every POLLING_INTERVAL seconds
244+
while self.isExecuting(cursor.get_query_state()):
245+
time.sleep(self.POLLING_INTERVAL)
246+
log.info("Polling the status in test_execute_async")
247+
248+
cursor.get_async_execution_result()
249+
result = cursor.fetchall()
250+
251+
assert len(result) == x_dimension * y_dimension
252+
201253

202254
# Exclude Retry tests because they require specific setups, and LargeQueries too slow for core
203255
# tests
@@ -828,4 +880,4 @@ def test_initial_namespace(self):
828880
cursor.execute("select current_catalog()")
829881
assert cursor.fetchone()[0] == self.arguments["catalog"]
830882
cursor.execute("select current_database()")
831-
assert cursor.fetchone()[0] == table_name
883+
assert cursor.fetchone()[0] == table_name

0 commit comments

Comments
 (0)