Skip to content

Commit e637408

Browse files
committed
Built the basic flow for the async pipeline - testing is remaining
1 parent ecdddba commit e637408

File tree

2 files changed

+16
-5
lines changed

2 files changed

+16
-5
lines changed

src/databricks/sql/client.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -733,6 +733,7 @@ def execute(
733733
self,
734734
operation: str,
735735
parameters: Optional[TParameterCollection] = None,
736+
perform_async = True
736737
) -> "Cursor":
737738
"""
738739
Execute a query and wait for execution to complete.
@@ -796,6 +797,7 @@ def execute(
796797
cursor=self,
797798
use_cloud_fetch=self.connection.use_cloud_fetch,
798799
parameters=prepared_params,
800+
perform_async=perform_async,
799801
)
800802
self.active_result_set = ResultSet(
801803
self.connection,
@@ -812,6 +814,11 @@ def execute(
812814

813815
return self
814816

817+
def executeAsync(self,
818+
operation: str,
819+
parameters: Optional[TParameterCollection] = None,):
820+
return execute(operation, parameters, True)
821+
815822
def executemany(self, operation, seq_of_parameters):
816823
"""
817824
Execute the operation once for every set of passed in parameters.

src/databricks/sql/thrift_backend.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -817,6 +817,7 @@ def execute_command(
817817
cursor,
818818
use_cloud_fetch=True,
819819
parameters=[],
820+
perform_async=False,
820821
):
821822
assert session_handle is not None
822823

@@ -846,7 +847,8 @@ def execute_command(
846847
parameters=parameters,
847848
)
848849
resp = self.make_request(self._client.ExecuteStatement, req)
849-
return self._handle_execute_response(resp, cursor)
850+
851+
return self._handle_execute_response(resp, cursor, perform_async)
850852

851853
def get_catalogs(self, session_handle, max_rows, max_bytes, cursor):
852854
assert session_handle is not None
@@ -934,14 +936,16 @@ def get_columns(
934936
resp = self.make_request(self._client.GetColumns, req)
935937
return self._handle_execute_response(resp, cursor)
936938

937-
def _handle_execute_response(self, resp, cursor):
939+
def _handle_execute_response(self, resp, cursor, perform_async=False):
938940
cursor.active_op_handle = resp.operationHandle
939941
self._check_direct_results_for_error(resp.directResults)
940942

941-
final_operation_state = self._wait_until_command_done(
943+
if perform_async:
944+
final_operation_state=ttypes.TStatusCode.STILL_EXECUTING_STATUS
945+
else:
946+
final_operation_state=self._wait_until_command_done(
942947
resp.operationHandle,
943-
resp.directResults and resp.directResults.operationStatus,
944-
)
948+
resp.directResults and resp.directResults.operationStatus)
945949

946950
return self._results_message_to_execute_response(resp, final_operation_state)
947951

0 commit comments

Comments
 (0)