From 2272a9552f3a777cfc2d0dc67a32a4af012e5459 Mon Sep 17 00:00:00 2001 From: Dan Homola Date: Thu, 12 Jun 2025 11:51:50 +0200 Subject: [PATCH 1/3] feat: add on_execution_submitted to gooddata_pandas Users can now provide a callback that will be called with the Execution object as soon as it is available. This is useful for getting some of the information available in the Execution object before the actual data is loaded. The Execution object can also be used to cancel the execution result operation. JIRA: CQ-1387 risk: low --- .../gooddata_pandas/data_access.py | 13 +++- gooddata-pandas/gooddata_pandas/dataframe.py | 72 +++++++++++++++---- gooddata-pandas/gooddata_pandas/series.py | 13 +++- 3 files changed, 81 insertions(+), 17 deletions(-) diff --git a/gooddata-pandas/gooddata_pandas/data_access.py b/gooddata-pandas/gooddata_pandas/data_access.py index 89865bc04..74bb69ba1 100644 --- a/gooddata-pandas/gooddata_pandas/data_access.py +++ b/gooddata-pandas/gooddata_pandas/data_access.py @@ -1,12 +1,13 @@ # (C) 2021 GoodData Corporation from __future__ import annotations -from typing import Any, Optional, Union +from typing import Any, Callable, Optional, Union from gooddata_sdk import ( Attribute, AttributeFilter, CatalogAttribute, + Execution, ExecutionDefinition, ExecutionResponse, Filter, @@ -412,6 +413,7 @@ def compute_and_extract( columns: ColumnsDef, index_by: Optional[IndexDef] = None, filter_by: Optional[Union[Filter, list[Filter]]] = None, + on_execution_submitted: Optional[Callable[[Execution], None]] = None, ) -> tuple[dict, dict]: """ Convenience function that computes and extracts data from the execution response. @@ -422,14 +424,16 @@ def compute_and_extract( columns (ColumnsDef): The columns definition. index_by (Optional[IndexDef]): The index definition, if any. filter_by (Optional[Union[Filter, list[Filter]]]): A filter or a list of filters, if any. + on_execution_submitted (Optional[Callable[[Execution], None]]): Callback to call when the execution was + submitted to the backend. Returns: tuple: A tuple containing the following dictionaries: - dict: A dictionary with data for each column in `columns`. - dict: A dictionary with data for constructing index(es) for each index in index_by. - Note: For convenience it is possible to pass just single index. in that case the index dict will contain exactly - one key of '0' (just get first value from dict when consuming the result). + Note: For convenience, it is possible to pass just a single index. In that case, the index dict will contain exactly + one key of '0' (just get the first value from dict when consuming the result). """ result = _compute( sdk=sdk, @@ -441,6 +445,9 @@ def compute_and_extract( response, col_to_attr_idx, col_to_metric_idx, index_to_attr_idx = result + if on_execution_submitted is not None: + on_execution_submitted(response) + exec_def = response.exec_def cols = list(columns.keys()) diff --git a/gooddata-pandas/gooddata_pandas/dataframe.py b/gooddata-pandas/gooddata_pandas/dataframe.py index 8045baa92..d90ae14da 100644 --- a/gooddata-pandas/gooddata_pandas/dataframe.py +++ b/gooddata-pandas/gooddata_pandas/dataframe.py @@ -1,13 +1,14 @@ # (C) 2021 GoodData Corporation from __future__ import annotations -from typing import Optional, Union +from typing import Callable, Optional, Union import pandas from gooddata_api_client import models from gooddata_sdk import ( Attribute, BareExecutionResponse, + Execution, ExecutionDefinition, Filter, GoodDataSdk, @@ -68,19 +69,25 @@ def __init__(self, sdk: GoodDataSdk, workspace_id: str) -> None: self._workspace_id = workspace_id def indexed( - self, index_by: IndexDef, columns: ColumnsDef, filter_by: Optional[Union[Filter, list[Filter]]] = None + self, + index_by: IndexDef, + columns: ColumnsDef, + filter_by: Optional[Union[Filter, list[Filter]]] = None, + on_execution_submitted: Optional[Callable[[Execution], None]] = None, ) -> pandas.DataFrame: """ Creates a data frame indexed by values of the label. The data frame columns will be created from either metrics or other label values. - Note that depending on composition of the labels, the DataFrame's index may or may not be unique. + Note that depending on the composition of the labels, the DataFrame's index may or may not be unique. Args: index_by (IndexDef): One or more labels to index by. columns (ColumnsDef): Dictionary mapping column name to its definition. filter_by (Optional[Union[Filter, list[Filter]]]): Optional filters to apply during computation on the server. + on_execution_submitted (Optional[Callable[[Execution], None]]): Callback to call when the execution was + submitted to the backend. Returns: pandas.DataFrame: A DataFrame instance. @@ -91,6 +98,7 @@ def indexed( columns=columns, index_by=index_by, filter_by=filter_by, + on_execution_submitted=on_execution_submitted, ) _idx = make_pandas_index(index) @@ -98,7 +106,10 @@ def indexed( return pandas.DataFrame(data=data, index=_idx) def not_indexed( - self, columns: ColumnsDef, filter_by: Optional[Union[Filter, list[Filter]]] = None + self, + columns: ColumnsDef, + filter_by: Optional[Union[Filter, list[Filter]]] = None, + on_execution_submitted: Optional[Callable[[Execution], None]] = None, ) -> pandas.DataFrame: """ Creates a data frame with columns created from metrics and or labels. @@ -107,21 +118,33 @@ def not_indexed( columns (ColumnsDef): Dictionary mapping column name to its definition. filter_by (Optional[Union[Filter, list[Filter]]]): Optionally specify filters to apply during computation on the server. + on_execution_submitted (Optional[Callable[[Execution], None]]): Callback to call when the execution was + submitted to the backend. Returns: pandas.DataFrame: A DataFrame instance. """ - data, _ = compute_and_extract(self._sdk, self._workspace_id, columns=columns, filter_by=filter_by) + data, _ = compute_and_extract( + self._sdk, + self._workspace_id, + columns=columns, + filter_by=filter_by, + on_execution_submitted=on_execution_submitted, + ) return pandas.DataFrame(data=data) def for_items( - self, items: ColumnsDef, filter_by: Optional[Union[Filter, list[Filter]]] = None, auto_index: bool = True + self, + items: ColumnsDef, + filter_by: Optional[Union[Filter, list[Filter]]] = None, + auto_index: bool = True, + on_execution_submitted: Optional[Callable[[Execution], None]] = None, ) -> pandas.DataFrame: """ Creates a data frame for named items. This is a convenience method that will create DataFrame with or - without index based on the context of the items that you pass. + without an index based on the context of the items that you pass. Args: items (ColumnsDef): Dictionary mapping item name to its definition. @@ -129,6 +152,8 @@ def for_items( on the server. auto_index (bool): Default True. Enables creation of DataFrame with index depending on the contents of the items. + on_execution_submitted (Optional[Callable[[Execution], None]]): Callback to call when the execution was + submitted to the backend. Returns: pandas.DataFrame: A DataFrame instance. @@ -157,9 +182,15 @@ def for_items( index_by=resolved_attr_cols, columns=resolved_measure_cols, filter_by=filter_by, + on_execution_submitted=on_execution_submitted, ) - def for_visualization(self, visualization_id: str, auto_index: bool = True) -> pandas.DataFrame: + def for_visualization( + self, + visualization_id: str, + auto_index: bool = True, + on_execution_submitted: Optional[Callable[[Execution], None]] = None, + ) -> pandas.DataFrame: """ Creates a data frame with columns based on the content of the visualization with the provided identifier. @@ -167,6 +198,8 @@ def for_visualization(self, visualization_id: str, auto_index: bool = True) -> p visualization_id (str): Visualization identifier. auto_index (bool): Default True. Enables creation of DataFrame with index depending on the contents of the visualization. + on_execution_submitted (Optional[Callable[[Execution], None]]): Callback to call when the execution was + submitted to the backend. Returns: pandas.DataFrame: A DataFrame instance. @@ -181,22 +214,31 @@ def for_visualization(self, visualization_id: str, auto_index: bool = True) -> p **{naming.col_name_for_metric(m): m.as_computable() for m in visualization.metrics}, } - return self.for_items(columns, filter_by=filter_by, auto_index=auto_index) + return self.for_items( + columns, filter_by=filter_by, auto_index=auto_index, on_execution_submitted=on_execution_submitted + ) def for_created_visualization( - self, created_visualizations_response: dict + self, + created_visualizations_response: dict, + on_execution_submitted: Optional[Callable[[Execution], None]] = None, ) -> tuple[pandas.DataFrame, DataFrameMetadata]: """ Creates a data frame using a created visualization. Args: created_visualizations_response (dict): Created visualization response. + on_execution_submitted (Optional[Callable[[Execution], None]]): Callback to call when the execution was + submitted to the backend. Returns: pandas.DataFrame: A DataFrame instance. """ execution_definition = self._sdk.compute.build_exec_def_from_chat_result(created_visualizations_response) - return self.for_exec_def(exec_def=execution_definition) + return self.for_exec_def( + exec_def=execution_definition, + on_execution_submitted=on_execution_submitted, + ) def result_cache_metadata_for_exec_result_id(self, result_id: str) -> ResultCacheMetadata: """ @@ -217,6 +259,7 @@ def for_exec_def( result_size_dimensions_limits: ResultSizeDimensions = (), result_size_bytes_limit: Optional[int] = None, page_size: int = _DEFAULT_PAGE_SIZE, + on_execution_submitted: Optional[Callable[[Execution], None]] = None, ) -> tuple[pandas.DataFrame, DataFrameMetadata]: """ Creates a data frame using an execution definition. @@ -247,6 +290,8 @@ def for_exec_def( result_size_dimensions_limits (ResultSizeDimensions): A tuple containing maximum size of result dimensions. result_size_bytes_limit (Optional[int]): Maximum size of result in bytes. page_size (int): Number of records per page. + on_execution_submitted (Optional[Callable[[Execution], None]]): Callback to call when the execution was + submitted to the backend. Returns: Tuple[pandas.DataFrame, DataFrameMetadata]: Tuple holding DataFrame and DataFrame metadata. @@ -257,6 +302,9 @@ def for_exec_def( execution = self._sdk.compute.for_exec_def(workspace_id=self._workspace_id, exec_def=exec_def) result_cache_metadata = self.result_cache_metadata_for_exec_result_id(execution.result_id) + if on_execution_submitted is not None: + on_execution_submitted(execution) + return convert_execution_response_to_dataframe( execution_response=execution.bare_exec_response, result_cache_metadata=result_cache_metadata, @@ -302,7 +350,7 @@ def for_exec_result_id( label_overrides (Optional[LabelOverrides]): Label overrides for metrics and attributes. result_cache_metadata (Optional[ResultCacheMetadata]): Cache metadata for the execution result. result_size_dimensions_limits (ResultSizeDimensions): A tuple containing maximum size of result dimensions. - result_size_bytes_limit (Optional[int]): Maximum size of result in bytes. + result_size_bytes_limit (Optional[int]): Maximum size of the result in bytes. use_local_ids_in_headers (bool): Use local identifier in headers. use_primary_labels_in_attributes (bool): Use primary labels in attributes. page_size (int): Number of records per page. diff --git a/gooddata-pandas/gooddata_pandas/series.py b/gooddata-pandas/gooddata_pandas/series.py index 4d84e95ba..d47cc214f 100644 --- a/gooddata-pandas/gooddata_pandas/series.py +++ b/gooddata-pandas/gooddata_pandas/series.py @@ -1,10 +1,10 @@ # (C) 2021 GoodData Corporation from __future__ import annotations -from typing import Optional, Union +from typing import Callable, Optional, Union import pandas -from gooddata_sdk import Attribute, Filter, GoodDataSdk, ObjId, SimpleMetric +from gooddata_sdk import Attribute, Execution, Filter, GoodDataSdk, ObjId, SimpleMetric from gooddata_pandas.data_access import compute_and_extract from gooddata_pandas.utils import IndexDef, LabelItemDef, make_pandas_index @@ -28,6 +28,7 @@ def indexed( index_by: IndexDef, data_by: Union[SimpleMetric, str, ObjId, Attribute], filter_by: Optional[Union[Filter, list[Filter]]] = None, + on_execution_submitted: Optional[Callable[[Execution], None]] = None, ) -> pandas.Series: """Creates pandas Series from data points calculated from a single `data_by`. @@ -61,6 +62,9 @@ def indexed( - object identifier: ``ObjId(id='some_label_id', type='')`` - Attribute or Metric depending on type of filter + on_execution_submitted (Optional[Callable[[Execution], None]]): Callback to call when the execution was + submitted to the backend. + Returns: pandas.Series: pandas series instance """ @@ -71,6 +75,7 @@ def indexed( index_by=index_by, columns={"_series": data_by}, filter_by=filter_by, + on_execution_submitted=on_execution_submitted, ) _idx = make_pandas_index(index) @@ -82,6 +87,7 @@ def not_indexed( data_by: Union[SimpleMetric, str, ObjId, Attribute], granularity: Optional[Union[list[LabelItemDef], IndexDef]] = None, filter_by: Optional[Union[Filter, list[Filter]]] = None, + on_execution_submitted: Optional[Callable[[Execution], None]] = None, ) -> pandas.Series: """ Creates a pandas.Series from data points calculated from a single `data_by` without constructing an index. @@ -108,6 +114,8 @@ def not_indexed( - ObjId: ObjId(id='some_label_id', type='') - Attribute or Metric depending on the type of filter Defaults to None. + on_execution_submitted (Optional[Callable[[Execution], None]]): Callback to call when the execution was + submitted to the backend. Returns: pandas.Series: The resulting pandas Series instance. @@ -124,6 +132,7 @@ def not_indexed( index_by=_index, columns={"_series": data_by}, filter_by=filter_by, + on_execution_submitted=on_execution_submitted, ) return pandas.Series(data=data["_series"]) From f67ee61b145e0b1f1271ae8149babe0ebab6c8a8 Mon Sep 17 00:00:00 2001 From: Dan Homola Date: Thu, 12 Jun 2025 11:55:15 +0200 Subject: [PATCH 2/3] chore: use Execution in data_access.py The ExecutionResponse is just an alias so unify the types in the file to keep things simpler. JIRA: CQ-1387 risk: low --- gooddata-pandas/gooddata_pandas/data_access.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/gooddata-pandas/gooddata_pandas/data_access.py b/gooddata-pandas/gooddata_pandas/data_access.py index 74bb69ba1..0ecdb9051 100644 --- a/gooddata-pandas/gooddata_pandas/data_access.py +++ b/gooddata-pandas/gooddata_pandas/data_access.py @@ -9,7 +9,6 @@ CatalogAttribute, Execution, ExecutionDefinition, - ExecutionResponse, Filter, GoodDataSdk, Metric, @@ -258,7 +257,7 @@ def _compute( columns: ColumnsDef, index_by: Optional[IndexDef] = None, filter_by: Optional[Union[Filter, list[Filter]]] = None, -) -> tuple[ExecutionResponse, dict[str, int], dict[str, int], dict[str, int]]: +) -> tuple[Execution, dict[str, int], dict[str, int], dict[str, int]]: """ Internal function that computes an execution-by-convention to retrieve data for a data frame with the provided columns, optionally indexed by the index_by label and optionally filtered. @@ -272,7 +271,7 @@ def _compute( Returns: tuple: A tuple containing the following elements: - - ExecutionResponse: The execution response. + - Execution: The execution response. - dict[str, int]: A mapping of pandas column names to attribute dimension indices. - dict[str, int]: A mapping of pandas column names to metric dimension indices. - dict[str, int]: A mapping of pandas index names to attribute dimension indices. @@ -300,12 +299,12 @@ def _compute( # -def _extract_for_metrics_only(response: ExecutionResponse, cols: list, col_to_metric_idx: dict) -> dict: +def _extract_for_metrics_only(response: Execution, cols: list, col_to_metric_idx: dict) -> dict: """ Internal function that extracts data for metrics-only columns when there are no attribute columns. Args: - response (ExecutionResponse): The execution response to extract data from. + response (Execution): The execution response to extract data from. cols (list): A list of column names. col_to_metric_idx (dict): A mapping of pandas column names to metric dimension indices. @@ -346,7 +345,7 @@ def _typed_result(attributes: list[CatalogAttribute], attribute: Attribute, resu def _extract_from_attributes_and_maybe_metrics( - response: ExecutionResponse, + response: Execution, attributes: list[CatalogAttribute], cols: list[str], col_to_attr_idx: dict[str, int], @@ -358,7 +357,7 @@ def _extract_from_attributes_and_maybe_metrics( optionally metrics columns. Args: - response (ExecutionResponse): The execution response to extract data from. + response (Execution): The execution response to extract data from. attributes (list[CatalogAttribute]): The catalog of attributes. cols (list[str]): A list of column names. col_to_attr_idx (dict[str, int]): A mapping of pandas column names to attribute dimension indices. From a5eef85f78d72e1b83e4d6b11ac58324f533dc20 Mon Sep 17 00:00:00 2001 From: Dan Homola Date: Thu, 12 Jun 2025 14:37:28 +0200 Subject: [PATCH 3/3] refactor: use more adequate parameter names in data_access.py Now that the type is Execution, the corresponding parameters should be called the same. JIRA: CQ-1387 risk: low --- .../gooddata_pandas/data_access.py | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/gooddata-pandas/gooddata_pandas/data_access.py b/gooddata-pandas/gooddata_pandas/data_access.py index 0ecdb9051..df76c1771 100644 --- a/gooddata-pandas/gooddata_pandas/data_access.py +++ b/gooddata-pandas/gooddata_pandas/data_access.py @@ -299,20 +299,20 @@ def _compute( # -def _extract_for_metrics_only(response: Execution, cols: list, col_to_metric_idx: dict) -> dict: +def _extract_for_metrics_only(execution: Execution, cols: list, col_to_metric_idx: dict) -> dict: """ Internal function that extracts data for metrics-only columns when there are no attribute columns. Args: - response (Execution): The execution response to extract data from. + execution (Execution): The execution response to extract data from. cols (list): A list of column names. col_to_metric_idx (dict): A mapping of pandas column names to metric dimension indices. Returns: dict: A dictionary containing the extracted data. """ - exec_def = response.exec_def - result = response.read_result(len(exec_def.metrics)) + exec_def = execution.exec_def + result = execution.read_result(len(exec_def.metrics)) if len(result.data) == 0: return {col: [] for col in cols} @@ -345,7 +345,7 @@ def _typed_result(attributes: list[CatalogAttribute], attribute: Attribute, resu def _extract_from_attributes_and_maybe_metrics( - response: Execution, + execution: Execution, attributes: list[CatalogAttribute], cols: list[str], col_to_attr_idx: dict[str, int], @@ -357,7 +357,7 @@ def _extract_from_attributes_and_maybe_metrics( optionally metrics columns. Args: - response (Execution): The execution response to extract data from. + execution (Execution): The execution response to extract data from. attributes (list[CatalogAttribute]): The catalog of attributes. cols (list[str]): A list of column names. col_to_attr_idx (dict[str, int]): A mapping of pandas column names to attribute dimension indices. @@ -370,11 +370,11 @@ def _extract_from_attributes_and_maybe_metrics( - dict: A dictionary containing the extracted data. - dict: A dictionary containing the extracted index data. """ - exec_def = response.exec_def + exec_def = execution.exec_def offset = [0 for _ in exec_def.dimensions] limit = [len(exec_def.metrics), _RESULT_PAGE_LEN] if exec_def.has_metrics() else [_RESULT_PAGE_LEN] attribute_dim = 1 if exec_def.has_metrics() else 0 - result = response.read_result(limit=limit, offset=offset) + result = execution.read_result(limit=limit, offset=offset) safe_index_to_attr_idx = index_to_attr_idx if index_to_attr_idx is not None else dict() # mappings from column name to Attribute @@ -401,7 +401,7 @@ def _extract_from_attributes_and_maybe_metrics( break offset[attribute_dim] = result.next_page_start(attribute_dim) - result = response.read_result(limit=limit, offset=offset) + result = execution.read_result(limit=limit, offset=offset) return data, index @@ -442,20 +442,20 @@ def compute_and_extract( filter_by=filter_by, ) - response, col_to_attr_idx, col_to_metric_idx, index_to_attr_idx = result + execution, col_to_attr_idx, col_to_metric_idx, index_to_attr_idx = result if on_execution_submitted is not None: - on_execution_submitted(response) + on_execution_submitted(execution) - exec_def = response.exec_def + exec_def = execution.exec_def cols = list(columns.keys()) if not exec_def.has_attributes(): - return _extract_for_metrics_only(response, cols, col_to_metric_idx), dict() + return _extract_for_metrics_only(execution, cols, col_to_metric_idx), dict() else: attributes = get_catalog_attributes_for_extract(sdk, workspace_id, exec_def.attributes) return _extract_from_attributes_and_maybe_metrics( - response, + execution, attributes, cols, col_to_attr_idx,