From ce27f75f1472dc97a8b92c1473b99a17d774d37a Mon Sep 17 00:00:00 2001 From: Adam Fiedler Date: Tue, 15 Apr 2025 14:43:27 +0200 Subject: [PATCH 1/2] feat: cancel tokens bouncing jira: cq-1123 risk: high --- .../gooddata_sdk/compute/model/execution.py | 28 +++++++++++++++---- gooddata-sdk/gooddata_sdk/compute/service.py | 1 + 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/gooddata-sdk/gooddata_sdk/compute/model/execution.py b/gooddata-sdk/gooddata_sdk/compute/model/execution.py index 76b2783b0..55615f69c 100644 --- a/gooddata-sdk/gooddata_sdk/compute/model/execution.py +++ b/gooddata-sdk/gooddata_sdk/compute/model/execution.py @@ -60,12 +60,14 @@ def __init__( filters: Optional[list[Filter]], dimensions: list[TableDimension], totals: Optional[list[TotalDefinition]] = None, + is_cancellable: bool = False, ) -> None: self._attributes = attributes or [] self._metrics = metrics or [] self._filters = filters or [] self._dimensions = [dim for dim in dimensions if dim.item_ids is not None] self._totals = totals + self._is_cancellable = is_cancellable @property def attributes(self) -> list[Attribute]: @@ -98,6 +100,10 @@ def is_one_dim(self) -> bool: def is_two_dim(self) -> bool: return len(self.dimensions) == 2 + @property + def is_cancellable(self) -> bool: + return self._is_cancellable + def _create_value_sort_key(self, sort_key: dict) -> models.SortKey: sort_key_value = sort_key["value"] return models.SortKey( @@ -296,6 +302,7 @@ def __init__( api_client: GoodDataApiClient, workspace_id: str, execution_response: models.AfmExecutionResponse, + cancel_token: Optional[str] = None, ): self._api_client = api_client self._actions_api = self._api_client.actions_api @@ -303,6 +310,7 @@ def __init__( self._exec_response: models.ExecutionResponse = execution_response["execution_response"] self._afm_exec_response = execution_response + self._cancel_token = cancel_token @property def workspace_id(self) -> str: @@ -316,6 +324,10 @@ def result_id(self) -> str: def dimensions(self) -> Any: return self._exec_response["dimensions"] + @property + def cancel_token(self) -> Optional[str]: + return self._cancel_token + def read_result(self, limit: Union[int, list[int]], offset: Union[None, int, list[int]] = None) -> ExecutionResult: """ Reads from the execution result. @@ -335,6 +347,7 @@ def read_result(self, limit: Union[int, list[int]], offset: Union[None, int, lis limit=_limit, _check_return_type=False, _return_http_data_only=False, + **({"x_gdc_cancel_token": self.cancel_token} if self.cancel_token else {}), ) custom_headers = self._api_client.custom_headers if "X-GDC-TRACE-ID" in custom_headers and "X-GDC-TRACE-ID" in http_headers: @@ -351,7 +364,7 @@ def __str__(self) -> str: return self.__repr__() def __repr__(self) -> str: - return f"BareExecutionResponse(workspace_id={self.workspace_id}, result_id={self.result_id})" + return f"BareExecutionResponse(workspace_id={self.workspace_id}, result_id={self.result_id}, cancel_token={self.cancel_token})" class Execution: @@ -367,12 +380,11 @@ def __init__( workspace_id: str, exec_def: ExecutionDefinition, response: models.AfmExecutionResponse, + cancel_token: Optional[str] = None, ): self._exec_def = exec_def self._bare_exec_response = BareExecutionResponse( - api_client=api_client, - workspace_id=workspace_id, - execution_response=response, + api_client=api_client, workspace_id=workspace_id, execution_response=response, cancel_token=cancel_token ) @property @@ -395,6 +407,10 @@ def result_id(self) -> str: def dimensions(self) -> Any: return self.bare_exec_response._exec_response["dimensions"] + @property + def cancel_token(self) -> Optional[str]: + return self.bare_exec_response.cancel_token + def get_labels_and_formats(self) -> tuple[dict[str, str], dict[str, str]]: """ Extracts labels and custom measure formats from the execution response. @@ -425,7 +441,9 @@ def __str__(self) -> str: return self.__repr__() def __repr__(self) -> str: - return f"Execution(workspace_id={self.workspace_id}, result_id={self.bare_exec_response.result_id})" + return ( + f"Execution(workspace_id={self.workspace_id}, result_id={self.result_id}, cancel_token={self.cancel_token})" + ) # Originally ExecutionResponse contained also ExecutionDefinition which was not correct, therefore Execution class was diff --git a/gooddata-sdk/gooddata_sdk/compute/service.py b/gooddata-sdk/gooddata_sdk/compute/service.py index 3650500df..23f6136eb 100644 --- a/gooddata-sdk/gooddata_sdk/compute/service.py +++ b/gooddata-sdk/gooddata_sdk/compute/service.py @@ -41,6 +41,7 @@ def for_exec_def(self, workspace_id: str, exec_def: ExecutionDefinition) -> Exec workspace_id=workspace_id, exec_def=exec_def, response=response, + cancel_token=response.headers.get("X-GDC-CANCEL-TOKEN") if exec_def.is_cancellable else None, ) def retrieve_result_cache_metadata(self, workspace_id: str, result_id: str) -> ResultCacheMetadata: From a60c5a6ddb762ae909efbc07e5c8c872bad8d3c8 Mon Sep 17 00:00:00 2001 From: Adam Fiedler Date: Tue, 15 Apr 2025 16:29:25 +0200 Subject: [PATCH 2/2] feat: add support for cancel api jira: cq-1123 risk: low --- gooddata-sdk/gooddata_sdk/compute/service.py | 22 ++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/gooddata-sdk/gooddata_sdk/compute/service.py b/gooddata-sdk/gooddata_sdk/compute/service.py index 23f6136eb..bbf810aee 100644 --- a/gooddata-sdk/gooddata_sdk/compute/service.py +++ b/gooddata-sdk/gooddata_sdk/compute/service.py @@ -3,6 +3,7 @@ import logging +from gooddata_api_client.model.afm_cancel_tokens import AfmCancelTokens from gooddata_api_client.model.chat_history_request import ChatHistoryRequest from gooddata_api_client.model.chat_history_result import ChatHistoryResult from gooddata_api_client.model.chat_request import ChatRequest @@ -108,3 +109,24 @@ def ai_chat_history_reset(self, workspace_id: str) -> None: """ chat_history_request = ChatHistoryRequest(reset=True) self._actions_api.ai_chat_history(workspace_id, chat_history_request, _check_return_type=False) + + def cancel_executions(self, executions: list[Execution]) -> None: + """ + Try to cancel given executions using the cancel api endpoint. + + *Note that this is currently a noop, we will be enabling this functionality soon.* + + Args: + executions: list of executions to send for cancellation + """ + workspace_to_tokens: dict[str, set[str]] = {} + + for execution in executions: + if not workspace_to_tokens[execution.workspace_id]: + workspace_to_tokens[execution.workspace_id] = set() + + if execution.cancel_token: + workspace_to_tokens[execution.workspace_id].add(execution.cancel_token) + + for workspace_id, token_ids in workspace_to_tokens.items(): + self._actions_api.cancel_executions(workspace_id, AfmCancelTokens(list(token_ids)))