|
30 | 30 | BackendType, |
31 | 31 | ExecuteResponse, |
32 | 32 | ) |
33 | | -from databricks.sql.exc import ServerOperationError |
| 33 | +from databricks.sql.exc import DatabaseError, ServerOperationError |
34 | 34 | from databricks.sql.backend.sea.utils.http_client import SeaHttpClient |
35 | 35 | from databricks.sql.thrift_api.TCLIService import ttypes |
36 | 36 | from databricks.sql.types import SSLOptions |
@@ -396,6 +396,42 @@ def _results_message_to_execute_response(self, sea_response, command_id): |
396 | 396 |
|
397 | 397 | return execute_response, result_data_obj, manifest_obj |
398 | 398 |
|
| 399 | + def _check_command_not_in_failed_or_closed_state( |
| 400 | + self, state: CommandState, command_id: CommandId |
| 401 | + ) -> None: |
| 402 | + if state == CommandState.CLOSED: |
| 403 | + raise DatabaseError( |
| 404 | + "Command {} unexpectedly closed server side".format(command_id), |
| 405 | + { |
| 406 | + "operation-id": command_id, |
| 407 | + }, |
| 408 | + ) |
| 409 | + if state == CommandState.FAILED: |
| 410 | + raise ServerOperationError( |
| 411 | + "Command {} failed".format(command_id), |
| 412 | + { |
| 413 | + "operation-id": command_id, |
| 414 | + }, |
| 415 | + ) |
| 416 | + |
| 417 | + def _wait_until_command_done( |
| 418 | + self, response: ExecuteStatementResponse |
| 419 | + ) -> CommandState: |
| 420 | + """ |
| 421 | + Wait until a command is done. |
| 422 | + """ |
| 423 | + |
| 424 | + state = response.status.state |
| 425 | + command_id = CommandId.from_sea_statement_id(response.statement_id) |
| 426 | + |
| 427 | + while state in [CommandState.PENDING, CommandState.RUNNING]: |
| 428 | + time.sleep(self.POLL_INTERVAL_SECONDS) |
| 429 | + state = self.get_query_state(command_id) |
| 430 | + |
| 431 | + self._check_command_not_in_failed_or_closed_state(state, command_id) |
| 432 | + |
| 433 | + return state |
| 434 | + |
399 | 435 | def execute_command( |
400 | 436 | self, |
401 | 437 | operation: str, |
@@ -493,24 +529,7 @@ def execute_command( |
493 | 529 | if async_op: |
494 | 530 | return None |
495 | 531 |
|
496 | | - # For synchronous operation, wait for the statement to complete |
497 | | - status = response.status |
498 | | - state = status.state |
499 | | - |
500 | | - # Keep polling until we reach a terminal state |
501 | | - while state in [CommandState.PENDING, CommandState.RUNNING]: |
502 | | - time.sleep(0.5) # add a small delay to avoid excessive API calls |
503 | | - state = self.get_query_state(command_id) |
504 | | - |
505 | | - if state != CommandState.SUCCEEDED: |
506 | | - raise ServerOperationError( |
507 | | - f"Statement execution did not succeed: {status.error.message if status.error else 'Unknown error'}", |
508 | | - { |
509 | | - "operation-id": command_id.to_sea_statement_id(), |
510 | | - "diagnostic-info": None, |
511 | | - }, |
512 | | - ) |
513 | | - |
| 532 | + self._wait_until_command_done(response) |
514 | 533 | return self.get_execution_result(command_id, cursor) |
515 | 534 |
|
516 | 535 | def cancel_command(self, command_id: CommandId) -> None: |
|
0 commit comments