2121 CursorAlreadyClosedError ,
2222)
2323from databricks .sql .thrift_api .TCLIService import ttypes
24- from databricks .sql .thrift_backend import ThriftBackend
24+ from databricks .sql .thrift_backend import ThriftDatabricksClient
25+ from databricks .sql .db_client_interface import DatabricksClient
2526from databricks .sql .utils import (
2627 ExecuteResponse ,
2728 ParamEscaper ,
@@ -336,7 +337,7 @@ def cursor(
336337
337338 cursor = Cursor (
338339 self ,
339- self .session .thrift_backend ,
340+ self .session .backend ,
340341 arraysize = arraysize ,
341342 result_buffer_size_bytes = buffer_size_bytes ,
342343 )
@@ -369,7 +370,7 @@ class Cursor:
369370 def __init__ (
370371 self ,
371372 connection : Connection ,
372- thrift_backend : ThriftBackend ,
373+ backend : DatabricksClient ,
373374 result_buffer_size_bytes : int = DEFAULT_RESULT_BUFFER_SIZE_BYTES ,
374375 arraysize : int = DEFAULT_ARRAY_SIZE ,
375376 ) -> None :
@@ -388,7 +389,7 @@ def __init__(
388389 # Note that Cursor closed => active result set closed, but not vice versa
389390 self .open = True
390391 self .executing_command_id = None
391- self .thrift_backend = thrift_backend
392+ self .backend = backend
392393 self .active_op_handle = None
393394 self .escaper = ParamEscaper ()
394395 self .lastrowid = None
@@ -753,7 +754,8 @@ def execute(
753754
754755 self ._check_not_closed ()
755756 self ._close_and_clear_active_result_set ()
756- execute_response = self .thrift_backend .execute_command (
757+ print ("here" )
758+ execute_response = self .backend .execute_command (
757759 operation = prepared_operation ,
758760 session_handle = self .connection .session ._session_handle ,
759761 max_rows = self .arraysize ,
@@ -768,15 +770,15 @@ def execute(
768770 self .active_result_set = ResultSet (
769771 self .connection ,
770772 execute_response ,
771- self .thrift_backend ,
773+ self .backend ,
772774 self .buffer_size_bytes ,
773775 self .arraysize ,
774776 self .connection .use_cloud_fetch ,
775777 )
776778
777779 if execute_response .is_staging_operation :
778780 self ._handle_staging_operation (
779- staging_allowed_local_path = self .thrift_backend .staging_allowed_local_path
781+ staging_allowed_local_path = self .backend .staging_allowed_local_path
780782 )
781783
782784 return self
@@ -816,7 +818,7 @@ def execute_async(
816818
817819 self ._check_not_closed ()
818820 self ._close_and_clear_active_result_set ()
819- self .thrift_backend .execute_command (
821+ self .backend .execute_command (
820822 operation = prepared_operation ,
821823 session_handle = self .connection .session ._session_handle ,
822824 max_rows = self .arraysize ,
@@ -838,7 +840,7 @@ def get_query_state(self) -> "TOperationState":
838840 :return:
839841 """
840842 self ._check_not_closed ()
841- return self .thrift_backend .get_query_state (self .active_op_handle )
843+ return self .backend .get_query_state (self .active_op_handle )
842844
843845 def is_query_pending (self ):
844846 """
@@ -868,20 +870,20 @@ def get_async_execution_result(self):
868870
869871 operation_state = self .get_query_state ()
870872 if operation_state == ttypes .TOperationState .FINISHED_STATE :
871- execute_response = self .thrift_backend .get_execution_result (
873+ execute_response = self .backend .get_execution_result (
872874 self .active_op_handle , self
873875 )
874876 self .active_result_set = ResultSet (
875877 self .connection ,
876878 execute_response ,
877- self .thrift_backend ,
879+ self .backend ,
878880 self .buffer_size_bytes ,
879881 self .arraysize ,
880882 )
881883
882884 if execute_response .is_staging_operation :
883885 self ._handle_staging_operation (
884- staging_allowed_local_path = self .thrift_backend .staging_allowed_local_path
886+ staging_allowed_local_path = self .backend .staging_allowed_local_path
885887 )
886888
887889 return self
@@ -913,7 +915,7 @@ def catalogs(self) -> "Cursor":
913915 """
914916 self ._check_not_closed ()
915917 self ._close_and_clear_active_result_set ()
916- execute_response = self .thrift_backend .get_catalogs (
918+ execute_response = self .backend .get_catalogs (
917919 session_handle = self .connection .session ._session_handle ,
918920 max_rows = self .arraysize ,
919921 max_bytes = self .buffer_size_bytes ,
@@ -922,9 +924,10 @@ def catalogs(self) -> "Cursor":
922924 self .active_result_set = ResultSet (
923925 self .connection ,
924926 execute_response ,
925- self .thrift_backend ,
927+ self .backend ,
926928 self .buffer_size_bytes ,
927929 self .arraysize ,
930+ self .connection .use_cloud_fetch ,
928931 )
929932 return self
930933
@@ -939,7 +942,7 @@ def schemas(
939942 """
940943 self ._check_not_closed ()
941944 self ._close_and_clear_active_result_set ()
942- execute_response = self .thrift_backend .get_schemas (
945+ execute_response = self .backend .get_schemas (
943946 session_handle = self .connection .session ._session_handle ,
944947 max_rows = self .arraysize ,
945948 max_bytes = self .buffer_size_bytes ,
@@ -950,9 +953,10 @@ def schemas(
950953 self .active_result_set = ResultSet (
951954 self .connection ,
952955 execute_response ,
953- self .thrift_backend ,
956+ self .backend ,
954957 self .buffer_size_bytes ,
955958 self .arraysize ,
959+ self .connection .use_cloud_fetch ,
956960 )
957961 return self
958962
@@ -972,7 +976,7 @@ def tables(
972976 self ._check_not_closed ()
973977 self ._close_and_clear_active_result_set ()
974978
975- execute_response = self .thrift_backend .get_tables (
979+ execute_response = self .backend .get_tables (
976980 session_handle = self .connection .session ._session_handle ,
977981 max_rows = self .arraysize ,
978982 max_bytes = self .buffer_size_bytes ,
@@ -985,9 +989,10 @@ def tables(
985989 self .active_result_set = ResultSet (
986990 self .connection ,
987991 execute_response ,
988- self .thrift_backend ,
992+ self .backend ,
989993 self .buffer_size_bytes ,
990994 self .arraysize ,
995+ self .connection .use_cloud_fetch ,
991996 )
992997 return self
993998
@@ -1007,7 +1012,7 @@ def columns(
10071012 self ._check_not_closed ()
10081013 self ._close_and_clear_active_result_set ()
10091014
1010- execute_response = self .thrift_backend .get_columns (
1015+ execute_response = self .backend .get_columns (
10111016 session_handle = self .connection .session ._session_handle ,
10121017 max_rows = self .arraysize ,
10131018 max_bytes = self .buffer_size_bytes ,
@@ -1020,9 +1025,10 @@ def columns(
10201025 self .active_result_set = ResultSet (
10211026 self .connection ,
10221027 execute_response ,
1023- self .thrift_backend ,
1028+ self .backend ,
10241029 self .buffer_size_bytes ,
10251030 self .arraysize ,
1031+ self .connection .use_cloud_fetch ,
10261032 )
10271033 return self
10281034
@@ -1097,7 +1103,7 @@ def cancel(self) -> None:
10971103 This method can be called from another thread.
10981104 """
10991105 if self .active_op_handle is not None :
1100- self .thrift_backend .cancel_command (self .active_op_handle )
1106+ self .backend .cancel_command (self .active_op_handle )
11011107 else :
11021108 logger .warning (
11031109 "Attempting to cancel a command, but there is no "
@@ -1172,7 +1178,7 @@ def __init__(
11721178 self ,
11731179 connection : Connection ,
11741180 execute_response : ExecuteResponse ,
1175- thrift_backend : ThriftBackend ,
1181+ backend : DatabricksClient ,
11761182 result_buffer_size_bytes : int = DEFAULT_RESULT_BUFFER_SIZE_BYTES ,
11771183 arraysize : int = 10000 ,
11781184 use_cloud_fetch : bool = True ,
@@ -1182,8 +1188,10 @@ def __init__(
11821188
11831189 :param connection: The parent connection that was used to execute this command
11841190 :param execute_response: A `ExecuteResponse` class returned by a command execution
1185- :param result_buffer_size_bytes: The size (in bytes) of the internal buffer + max fetch
1186- amount :param arraysize: The max number of rows to fetch at a time (PEP-249)
1191+ :param backend: The DatabricksClient instance to use for fetching results
1192+ :param result_buffer_size_bytes: The size (in bytes) of the internal buffer + max fetch amount
1193+ :param arraysize: The max number of rows to fetch at a time (PEP-249)
1194+ :param use_cloud_fetch: Whether to use cloud fetch for retrieving results
11871195 """
11881196 self .connection = connection
11891197 self .command_id = execute_response .command_handle
@@ -1193,7 +1201,7 @@ def __init__(
11931201 self .buffer_size_bytes = result_buffer_size_bytes
11941202 self .lz4_compressed = execute_response .lz4_compressed
11951203 self .arraysize = arraysize
1196- self .thrift_backend = thrift_backend
1204+ self .backend = backend
11971205 self .description = execute_response .description
11981206 self ._arrow_schema_bytes = execute_response .arrow_schema_bytes
11991207 self ._next_row_index = 0
@@ -1216,8 +1224,15 @@ def __iter__(self):
12161224 break
12171225
12181226 def _fill_results_buffer (self ):
1219- # At initialization or if the server does not have cloud fetch result links available
1220- results , has_more_rows = self .thrift_backend .fetch_results (
1227+ if not isinstance (self .backend , ThriftDatabricksClient ):
1228+ # This specific logic is for Thrift. SEA will have its own way.
1229+ raise NotImplementedError (
1230+ "Fetching further result batches is currently only implemented for the Thrift backend."
1231+ )
1232+
1233+ # Now we know self.backend is ThriftDatabricksClient, so it has fetch_results
1234+ thrift_backend_instance = self .backend # type: ThriftDatabricksClient
1235+ results , has_more_rows = thrift_backend_instance .fetch_results (
12211236 op_handle = self .command_id ,
12221237 max_rows = self .arraysize ,
12231238 max_bytes = self .buffer_size_bytes ,
@@ -1433,19 +1448,20 @@ def close(self) -> None:
14331448 If the connection has not been closed, and the cursor has not already
14341449 been closed on the server for some other reason, issue a request to the server to close it.
14351450 """
1451+ # TODO: the state is still thrift specific, define some ENUM for status that each service has to map to
14361452 try :
14371453 if (
1438- self .op_state != self . thrift_backend . CLOSED_OP_STATE
1454+ self .op_state != ttypes . TOperationState . CLOSED_STATE
14391455 and not self .has_been_closed_server_side
14401456 and self .connection .open
14411457 ):
1442- self .thrift_backend .close_command (self .command_id )
1458+ self .backend .close_command (self .command_id )
14431459 except RequestError as e :
14441460 if isinstance (e .args [1 ], CursorAlreadyClosedError ):
14451461 logger .info ("Operation was canceled by a prior request" )
14461462 finally :
14471463 self .has_been_closed_server_side = True
1448- self .op_state = self . thrift_backend . CLOSED_OP_STATE
1464+ self .op_state = ttypes . TOperationState . CLOSED_STATE
14491465
14501466 @staticmethod
14511467 def _get_schema_description (table_schema_message ):
0 commit comments