Skip to content

Commit c22f6c7

Browse files
authored
Merge branch 'main' into jprakash-db/complex-param
2 parents 8346de6 + 3842583 commit c22f6c7

File tree

8 files changed

+363
-32
lines changed

8 files changed

+363
-32
lines changed

.github/CODEOWNERS

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22
# the repo. Unless a later match takes precedence, these
33
# users will be requested for review when someone opens a
44
# pull request.
5-
* @deeksha-db @samikshya-db @jprakash-db @yunbodeng-db @jackyhu-db @benc-db
5+
* @deeksha-db @samikshya-db @jprakash-db @jackyhu-db @madhav-db @gopalldb @jayantsing-db @vikrantpuppala @shivam2680

.github/workflows/code-quality-checks.yml

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
11
name: Code Quality Checks
2-
on:
3-
push:
4-
branches:
5-
- main
6-
pull_request:
7-
branches:
8-
- main
2+
3+
on: [pull_request]
4+
95
jobs:
106
run-unit-tests:
117
runs-on: ubuntu-latest

.github/workflows/integration.yml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
name: Integration Tests
2+
23
on:
3-
push:
4-
paths-ignore:
5-
- "**.MD"
6-
- "**.md"
4+
push:
5+
branches:
6+
- main
7+
pull_request:
78

89
jobs:
910
run-e2e-tests:

src/databricks/sql/client.py

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,12 @@ def read(self) -> Optional[OAuthToken]:
214214
# use_cloud_fetch
215215
# Enable use of cloud fetch to extract large query results in parallel via cloud storage
216216

217+
logger.debug(
218+
"Connection.__init__(server_hostname=%s, http_path=%s)",
219+
server_hostname,
220+
http_path,
221+
)
222+
217223
if access_token:
218224
access_token_kv = {"access_token": access_token}
219225
kwargs = {**kwargs, **access_token_kv}
@@ -315,7 +321,13 @@ def __enter__(self) -> "Connection":
315321
return self
316322

317323
def __exit__(self, exc_type, exc_value, traceback):
318-
self.close()
324+
try:
325+
self.close()
326+
except BaseException as e:
327+
logger.warning(f"Exception during connection close in __exit__: {e}")
328+
if exc_type is None:
329+
raise
330+
return False
319331

320332
def __del__(self):
321333
if self.open:
@@ -456,7 +468,14 @@ def __enter__(self) -> "Cursor":
456468
return self
457469

458470
def __exit__(self, exc_type, exc_value, traceback):
459-
self.close()
471+
try:
472+
logger.debug("Cursor context manager exiting, calling close()")
473+
self.close()
474+
except BaseException as e:
475+
logger.warning(f"Exception during cursor close in __exit__: {e}")
476+
if exc_type is None:
477+
raise
478+
return False
460479

461480
def __iter__(self):
462481
if self.active_result_set:
@@ -787,6 +806,9 @@ def execute(
787806
788807
:returns self
789808
"""
809+
logger.debug(
810+
"Cursor.execute(operation=%s, parameters=%s)", operation, parameters
811+
)
790812

791813
param_approach = self._determine_parameter_approach(parameters)
792814
if param_approach == ParameterApproach.NONE:
@@ -1163,7 +1185,21 @@ def cancel(self) -> None:
11631185
def close(self) -> None:
11641186
"""Close cursor"""
11651187
self.open = False
1166-
self.active_op_handle = None
1188+
1189+
# Close active operation handle if it exists
1190+
if self.active_op_handle:
1191+
try:
1192+
self.thrift_backend.close_command(self.active_op_handle)
1193+
except RequestError as e:
1194+
if isinstance(e.args[1], CursorAlreadyClosedError):
1195+
logger.info("Operation was canceled by a prior request")
1196+
else:
1197+
logging.warning(f"Error closing operation handle: {e}")
1198+
except Exception as e:
1199+
logging.warning(f"Error closing operation handle: {e}")
1200+
finally:
1201+
self.active_op_handle = None
1202+
11671203
if self.active_result_set:
11681204
self._close_and_clear_active_result_set()
11691205

src/databricks/sql/thrift_backend.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,13 @@ def __init__(
131131
# max_download_threads
132132
# Number of threads for handling cloud fetch downloads. Defaults to 10
133133

134+
logger.debug(
135+
"ThriftBackend.__init__(server_hostname=%s, port=%s, http_path=%s)",
136+
server_hostname,
137+
port,
138+
http_path,
139+
)
140+
134141
port = port or 443
135142
if kwargs.get("_connection_uri"):
136143
uri = kwargs.get("_connection_uri")
@@ -390,6 +397,8 @@ def attempt_request(attempt):
390397

391398
# TODO: don't use exception handling for GOS polling...
392399

400+
logger.error("ThriftBackend.attempt_request: HTTPError: %s", err)
401+
393402
gos_name = TCLIServiceClient.GetOperationStatus.__name__
394403
if method.__name__ == gos_name:
395404
delay_default = (
@@ -434,6 +443,7 @@ def attempt_request(attempt):
434443
else:
435444
logger.warning(log_string)
436445
except Exception as err:
446+
logger.error("ThriftBackend.attempt_request: Exception: %s", err)
437447
error = err
438448
retry_delay = extract_retry_delay(attempt)
439449
error_message = ThriftBackend._extract_error_message_from_headers(
@@ -888,6 +898,12 @@ def execute_command(
888898
):
889899
assert session_handle is not None
890900

901+
logger.debug(
902+
"ThriftBackend.execute_command(operation=%s, session_handle=%s)",
903+
operation,
904+
session_handle,
905+
)
906+
891907
spark_arrow_types = ttypes.TSparkArrowTypes(
892908
timestampAsArrow=self._use_arrow_native_timestamps,
893909
decimalAsArrow=self._use_arrow_native_decimals,
@@ -1074,6 +1090,7 @@ def fetch_results(
10741090
return queue, resp.hasMoreRows
10751091

10761092
def close_command(self, op_handle):
1093+
logger.debug("ThriftBackend.close_command(op_handle=%s)", op_handle)
10771094
req = ttypes.TCloseOperationReq(operationHandle=op_handle)
10781095
resp = self.make_request(self._client.CloseOperation, req)
10791096
return resp.status

tests/e2e/test_complex_types.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def table_fixture(self, connection_details):
1515
# Create the table
1616
cursor.execute(
1717
"""
18-
CREATE TABLE IF NOT EXISTS pysql_test_complex_types_table (
18+
CREATE TABLE IF NOT EXISTS pysql_e2e_test_complex_types_table (
1919
array_col ARRAY<STRING>,
2020
map_col MAP<STRING, INTEGER>,
2121
struct_col STRUCT<field1: STRING, field2: INTEGER>,
@@ -28,7 +28,7 @@ def table_fixture(self, connection_details):
2828
# Insert a record
2929
cursor.execute(
3030
"""
31-
INSERT INTO pysql_test_complex_types_table
31+
INSERT INTO pysql_e2e_test_complex_types_table
3232
VALUES (
3333
ARRAY('a', 'b', 'c'),
3434
MAP('a', 1, 'b', 2, 'c', 3),
@@ -59,7 +59,7 @@ def test_read_complex_types_as_arrow(self, field, expected_type, table_fixture):
5959

6060
with self.cursor() as cursor:
6161
result = cursor.execute(
62-
"SELECT * FROM pysql_test_complex_types_table LIMIT 1"
62+
"SELECT * FROM pysql_e2e_test_complex_types_table LIMIT 1"
6363
).fetchone()
6464

6565
assert isinstance(result[field], expected_type)
@@ -81,7 +81,7 @@ def test_read_complex_types_as_string(self, field, table_fixture):
8181
extra_params={"_use_arrow_native_complex_types": False}
8282
) as cursor:
8383
result = cursor.execute(
84-
"SELECT * FROM pysql_test_complex_types_table LIMIT 1"
84+
"SELECT * FROM pysql_e2e_test_complex_types_table LIMIT 1"
8585
).fetchone()
8686

8787
assert isinstance(result[field], str)

tests/e2e/test_driver.py

Lines changed: 97 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050

5151
from tests.e2e.common.uc_volume_tests import PySQLUCVolumeTestSuiteMixin
5252

53-
from databricks.sql.exc import SessionAlreadyClosedError
53+
from databricks.sql.exc import SessionAlreadyClosedError, CursorAlreadyClosedError
5454

5555
log = logging.getLogger(__name__)
5656

@@ -820,7 +820,6 @@ def test_close_connection_closes_cursors(self):
820820
ars = cursor.active_result_set
821821

822822
# We must manually run this check because thrift_backend always forces `has_been_closed_server_side` to True
823-
824823
# Cursor op state should be open before connection is closed
825824
status_request = ttypes.TGetOperationStatusReq(
826825
operationHandle=ars.command_id, getProgressUpdate=False
@@ -847,9 +846,104 @@ def test_closing_a_closed_connection_doesnt_fail(self, caplog):
847846
with self.connection() as conn:
848847
# First .close() call is explicit here
849848
conn.close()
850-
851849
assert "Session appears to have been closed already" in caplog.text
852850

851+
conn = None
852+
try:
853+
with pytest.raises(KeyboardInterrupt):
854+
with self.connection() as c:
855+
conn = c
856+
raise KeyboardInterrupt("Simulated interrupt")
857+
finally:
858+
if conn is not None:
859+
assert not conn.open, "Connection should be closed after KeyboardInterrupt"
860+
861+
def test_cursor_close_properly_closes_operation(self):
862+
"""Test that Cursor.close() properly closes the active operation handle on the server."""
863+
with self.connection() as conn:
864+
cursor = conn.cursor()
865+
try:
866+
cursor.execute("SELECT 1 AS test")
867+
assert cursor.active_op_handle is not None
868+
cursor.close()
869+
assert cursor.active_op_handle is None
870+
assert not cursor.open
871+
finally:
872+
if cursor.open:
873+
cursor.close()
874+
875+
conn = None
876+
cursor = None
877+
try:
878+
with self.connection() as c:
879+
conn = c
880+
with pytest.raises(KeyboardInterrupt):
881+
with conn.cursor() as cur:
882+
cursor = cur
883+
raise KeyboardInterrupt("Simulated interrupt")
884+
finally:
885+
if cursor is not None:
886+
assert not cursor.open, "Cursor should be closed after KeyboardInterrupt"
887+
888+
def test_nested_cursor_context_managers(self):
889+
"""Test that nested cursor context managers properly close operations on the server."""
890+
with self.connection() as conn:
891+
with conn.cursor() as cursor1:
892+
cursor1.execute("SELECT 1 AS test1")
893+
assert cursor1.active_op_handle is not None
894+
895+
with conn.cursor() as cursor2:
896+
cursor2.execute("SELECT 2 AS test2")
897+
assert cursor2.active_op_handle is not None
898+
899+
# After inner context manager exit, cursor2 should be not open
900+
assert not cursor2.open
901+
assert cursor2.active_op_handle is None
902+
903+
# After outer context manager exit, cursor1 should be not open
904+
assert not cursor1.open
905+
assert cursor1.active_op_handle is None
906+
907+
def test_cursor_error_handling(self):
908+
"""Test that cursor close handles errors properly to prevent orphaned operations."""
909+
with self.connection() as conn:
910+
cursor = conn.cursor()
911+
912+
cursor.execute("SELECT 1 AS test")
913+
914+
op_handle = cursor.active_op_handle
915+
916+
assert op_handle is not None
917+
918+
# Manually close the operation to simulate server-side closure
919+
conn.thrift_backend.close_command(op_handle)
920+
921+
cursor.close()
922+
923+
assert not cursor.open
924+
925+
def test_result_set_close(self):
926+
"""Test that ResultSet.close() properly closes operations on the server and handles state correctly."""
927+
with self.connection() as conn:
928+
cursor = conn.cursor()
929+
try:
930+
cursor.execute("SELECT * FROM RANGE(10)")
931+
932+
result_set = cursor.active_result_set
933+
assert result_set is not None
934+
935+
initial_op_state = result_set.op_state
936+
937+
result_set.close()
938+
939+
assert result_set.op_state == result_set.thrift_backend.CLOSED_OP_STATE
940+
assert result_set.op_state != initial_op_state
941+
942+
# Closing the result set again should be a no-op and not raise exceptions
943+
result_set.close()
944+
finally:
945+
cursor.close()
946+
853947

854948
# use a RetrySuite to encapsulate these tests which we'll typically want to run together; however keep
855949
# the 429/503 subsuites separate since they execute under different circumstances.

0 commit comments

Comments
 (0)