Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions gooddata-sdk/gooddata_sdk/compute/model/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,14 @@ def __init__(
filters: Optional[list[Filter]],
dimensions: list[TableDimension],
totals: Optional[list[TotalDefinition]] = None,
is_cancellable: bool = False,
) -> None:
self._attributes = attributes or []
self._metrics = metrics or []
self._filters = filters or []
self._dimensions = [dim for dim in dimensions if dim.item_ids is not None]
self._totals = totals
self._is_cancellable = is_cancellable

@property
def attributes(self) -> list[Attribute]:
Expand Down Expand Up @@ -98,6 +100,10 @@ def is_one_dim(self) -> bool:
def is_two_dim(self) -> bool:
return len(self.dimensions) == 2

@property
def is_cancellable(self) -> bool:
return self._is_cancellable

def _create_value_sort_key(self, sort_key: dict) -> models.SortKey:
sort_key_value = sort_key["value"]
return models.SortKey(
Expand Down Expand Up @@ -296,13 +302,15 @@ def __init__(
api_client: GoodDataApiClient,
workspace_id: str,
execution_response: models.AfmExecutionResponse,
cancel_token: Optional[str] = None,
):
self._api_client = api_client
self._actions_api = self._api_client.actions_api
self._workspace_id = workspace_id

self._exec_response: models.ExecutionResponse = execution_response["execution_response"]
self._afm_exec_response = execution_response
self._cancel_token = cancel_token

@property
def workspace_id(self) -> str:
Expand All @@ -316,6 +324,10 @@ def result_id(self) -> str:
def dimensions(self) -> Any:
return self._exec_response["dimensions"]

@property
def cancel_token(self) -> Optional[str]:
return self._cancel_token

def read_result(self, limit: Union[int, list[int]], offset: Union[None, int, list[int]] = None) -> ExecutionResult:
"""
Reads from the execution result.
Expand All @@ -335,6 +347,7 @@ def read_result(self, limit: Union[int, list[int]], offset: Union[None, int, lis
limit=_limit,
_check_return_type=False,
_return_http_data_only=False,
**({"x_gdc_cancel_token": self.cancel_token} if self.cancel_token else {}),
)
custom_headers = self._api_client.custom_headers
if "X-GDC-TRACE-ID" in custom_headers and "X-GDC-TRACE-ID" in http_headers:
Expand All @@ -351,7 +364,7 @@ def __str__(self) -> str:
return self.__repr__()

def __repr__(self) -> str:
return f"BareExecutionResponse(workspace_id={self.workspace_id}, result_id={self.result_id})"
return f"BareExecutionResponse(workspace_id={self.workspace_id}, result_id={self.result_id}, cancel_token={self.cancel_token})"


class Execution:
Expand All @@ -367,12 +380,11 @@ def __init__(
workspace_id: str,
exec_def: ExecutionDefinition,
response: models.AfmExecutionResponse,
cancel_token: Optional[str] = None,
):
self._exec_def = exec_def
self._bare_exec_response = BareExecutionResponse(
api_client=api_client,
workspace_id=workspace_id,
execution_response=response,
api_client=api_client, workspace_id=workspace_id, execution_response=response, cancel_token=cancel_token
)

@property
Expand All @@ -395,6 +407,10 @@ def result_id(self) -> str:
def dimensions(self) -> Any:
return self.bare_exec_response._exec_response["dimensions"]

@property
def cancel_token(self) -> Optional[str]:
return self.bare_exec_response.cancel_token

def get_labels_and_formats(self) -> tuple[dict[str, str], dict[str, str]]:
"""
Extracts labels and custom measure formats from the execution response.
Expand Down Expand Up @@ -425,7 +441,9 @@ def __str__(self) -> str:
return self.__repr__()

def __repr__(self) -> str:
return f"Execution(workspace_id={self.workspace_id}, result_id={self.bare_exec_response.result_id})"
return (
f"Execution(workspace_id={self.workspace_id}, result_id={self.result_id}, cancel_token={self.cancel_token})"
)


# Originally ExecutionResponse contained also ExecutionDefinition which was not correct, therefore Execution class was
Expand Down
23 changes: 23 additions & 0 deletions gooddata-sdk/gooddata_sdk/compute/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import logging

from gooddata_api_client.model.afm_cancel_tokens import AfmCancelTokens
from gooddata_api_client.model.chat_history_request import ChatHistoryRequest
from gooddata_api_client.model.chat_history_result import ChatHistoryResult
from gooddata_api_client.model.chat_request import ChatRequest
Expand Down Expand Up @@ -41,6 +42,7 @@ def for_exec_def(self, workspace_id: str, exec_def: ExecutionDefinition) -> Exec
workspace_id=workspace_id,
exec_def=exec_def,
response=response,
cancel_token=response.headers.get("X-GDC-CANCEL-TOKEN") if exec_def.is_cancellable else None,
)

def retrieve_result_cache_metadata(self, workspace_id: str, result_id: str) -> ResultCacheMetadata:
Expand Down Expand Up @@ -107,3 +109,24 @@ def ai_chat_history_reset(self, workspace_id: str) -> None:
"""
chat_history_request = ChatHistoryRequest(reset=True)
self._actions_api.ai_chat_history(workspace_id, chat_history_request, _check_return_type=False)

def cancel_executions(self, executions: list[Execution]) -> None:
"""
Try to cancel given executions using the cancel api endpoint.

*Note that this is currently a noop, we will be enabling this functionality soon.*

Args:
executions: list of executions to send for cancellation
"""
workspace_to_tokens: dict[str, set[str]] = {}

for execution in executions:
if not workspace_to_tokens[execution.workspace_id]:
workspace_to_tokens[execution.workspace_id] = set()

if execution.cancel_token:
workspace_to_tokens[execution.workspace_id].add(execution.cancel_token)

for workspace_id, token_ids in workspace_to_tokens.items():
self._actions_api.cancel_executions(workspace_id, AfmCancelTokens(list(token_ids)))
Loading