diff --git a/gooddata-pandas/gooddata_pandas/data_access.py b/gooddata-pandas/gooddata_pandas/data_access.py index 89865bc04..df76c1771 100644 --- a/gooddata-pandas/gooddata_pandas/data_access.py +++ b/gooddata-pandas/gooddata_pandas/data_access.py @@ -1,14 +1,14 @@ # (C) 2021 GoodData Corporation from __future__ import annotations -from typing import Any, Optional, Union +from typing import Any, Callable, Optional, Union from gooddata_sdk import ( Attribute, AttributeFilter, CatalogAttribute, + Execution, ExecutionDefinition, - ExecutionResponse, Filter, GoodDataSdk, Metric, @@ -257,7 +257,7 @@ def _compute( columns: ColumnsDef, index_by: Optional[IndexDef] = None, filter_by: Optional[Union[Filter, list[Filter]]] = None, -) -> tuple[ExecutionResponse, dict[str, int], dict[str, int], dict[str, int]]: +) -> tuple[Execution, dict[str, int], dict[str, int], dict[str, int]]: """ Internal function that computes an execution-by-convention to retrieve data for a data frame with the provided columns, optionally indexed by the index_by label and optionally filtered. @@ -271,7 +271,7 @@ def _compute( Returns: tuple: A tuple containing the following elements: - - ExecutionResponse: The execution response. + - Execution: The execution response. - dict[str, int]: A mapping of pandas column names to attribute dimension indices. - dict[str, int]: A mapping of pandas column names to metric dimension indices. - dict[str, int]: A mapping of pandas index names to attribute dimension indices. @@ -299,20 +299,20 @@ def _compute( # -def _extract_for_metrics_only(response: ExecutionResponse, cols: list, col_to_metric_idx: dict) -> dict: +def _extract_for_metrics_only(execution: Execution, cols: list, col_to_metric_idx: dict) -> dict: """ Internal function that extracts data for metrics-only columns when there are no attribute columns. Args: - response (ExecutionResponse): The execution response to extract data from. + execution (Execution): The execution response to extract data from. cols (list): A list of column names. col_to_metric_idx (dict): A mapping of pandas column names to metric dimension indices. Returns: dict: A dictionary containing the extracted data. """ - exec_def = response.exec_def - result = response.read_result(len(exec_def.metrics)) + exec_def = execution.exec_def + result = execution.read_result(len(exec_def.metrics)) if len(result.data) == 0: return {col: [] for col in cols} @@ -345,7 +345,7 @@ def _typed_result(attributes: list[CatalogAttribute], attribute: Attribute, resu def _extract_from_attributes_and_maybe_metrics( - response: ExecutionResponse, + execution: Execution, attributes: list[CatalogAttribute], cols: list[str], col_to_attr_idx: dict[str, int], @@ -357,7 +357,7 @@ def _extract_from_attributes_and_maybe_metrics( optionally metrics columns. Args: - response (ExecutionResponse): The execution response to extract data from. + execution (Execution): The execution response to extract data from. attributes (list[CatalogAttribute]): The catalog of attributes. cols (list[str]): A list of column names. col_to_attr_idx (dict[str, int]): A mapping of pandas column names to attribute dimension indices. @@ -370,11 +370,11 @@ def _extract_from_attributes_and_maybe_metrics( - dict: A dictionary containing the extracted data. - dict: A dictionary containing the extracted index data. """ - exec_def = response.exec_def + exec_def = execution.exec_def offset = [0 for _ in exec_def.dimensions] limit = [len(exec_def.metrics), _RESULT_PAGE_LEN] if exec_def.has_metrics() else [_RESULT_PAGE_LEN] attribute_dim = 1 if exec_def.has_metrics() else 0 - result = response.read_result(limit=limit, offset=offset) + result = execution.read_result(limit=limit, offset=offset) safe_index_to_attr_idx = index_to_attr_idx if index_to_attr_idx is not None else dict() # mappings from column name to Attribute @@ -401,7 +401,7 @@ def _extract_from_attributes_and_maybe_metrics( break offset[attribute_dim] = result.next_page_start(attribute_dim) - result = response.read_result(limit=limit, offset=offset) + result = execution.read_result(limit=limit, offset=offset) return data, index @@ -412,6 +412,7 @@ def compute_and_extract( columns: ColumnsDef, index_by: Optional[IndexDef] = None, filter_by: Optional[Union[Filter, list[Filter]]] = None, + on_execution_submitted: Optional[Callable[[Execution], None]] = None, ) -> tuple[dict, dict]: """ Convenience function that computes and extracts data from the execution response. @@ -422,14 +423,16 @@ def compute_and_extract( columns (ColumnsDef): The columns definition. index_by (Optional[IndexDef]): The index definition, if any. filter_by (Optional[Union[Filter, list[Filter]]]): A filter or a list of filters, if any. + on_execution_submitted (Optional[Callable[[Execution], None]]): Callback to call when the execution was + submitted to the backend. Returns: tuple: A tuple containing the following dictionaries: - dict: A dictionary with data for each column in `columns`. - dict: A dictionary with data for constructing index(es) for each index in index_by. - Note: For convenience it is possible to pass just single index. in that case the index dict will contain exactly - one key of '0' (just get first value from dict when consuming the result). + Note: For convenience, it is possible to pass just a single index. In that case, the index dict will contain exactly + one key of '0' (just get the first value from dict when consuming the result). """ result = _compute( sdk=sdk, @@ -439,17 +442,20 @@ def compute_and_extract( filter_by=filter_by, ) - response, col_to_attr_idx, col_to_metric_idx, index_to_attr_idx = result + execution, col_to_attr_idx, col_to_metric_idx, index_to_attr_idx = result - exec_def = response.exec_def + if on_execution_submitted is not None: + on_execution_submitted(execution) + + exec_def = execution.exec_def cols = list(columns.keys()) if not exec_def.has_attributes(): - return _extract_for_metrics_only(response, cols, col_to_metric_idx), dict() + return _extract_for_metrics_only(execution, cols, col_to_metric_idx), dict() else: attributes = get_catalog_attributes_for_extract(sdk, workspace_id, exec_def.attributes) return _extract_from_attributes_and_maybe_metrics( - response, + execution, attributes, cols, col_to_attr_idx, diff --git a/gooddata-pandas/gooddata_pandas/dataframe.py b/gooddata-pandas/gooddata_pandas/dataframe.py index 8045baa92..d90ae14da 100644 --- a/gooddata-pandas/gooddata_pandas/dataframe.py +++ b/gooddata-pandas/gooddata_pandas/dataframe.py @@ -1,13 +1,14 @@ # (C) 2021 GoodData Corporation from __future__ import annotations -from typing import Optional, Union +from typing import Callable, Optional, Union import pandas from gooddata_api_client import models from gooddata_sdk import ( Attribute, BareExecutionResponse, + Execution, ExecutionDefinition, Filter, GoodDataSdk, @@ -68,19 +69,25 @@ def __init__(self, sdk: GoodDataSdk, workspace_id: str) -> None: self._workspace_id = workspace_id def indexed( - self, index_by: IndexDef, columns: ColumnsDef, filter_by: Optional[Union[Filter, list[Filter]]] = None + self, + index_by: IndexDef, + columns: ColumnsDef, + filter_by: Optional[Union[Filter, list[Filter]]] = None, + on_execution_submitted: Optional[Callable[[Execution], None]] = None, ) -> pandas.DataFrame: """ Creates a data frame indexed by values of the label. The data frame columns will be created from either metrics or other label values. - Note that depending on composition of the labels, the DataFrame's index may or may not be unique. + Note that depending on the composition of the labels, the DataFrame's index may or may not be unique. Args: index_by (IndexDef): One or more labels to index by. columns (ColumnsDef): Dictionary mapping column name to its definition. filter_by (Optional[Union[Filter, list[Filter]]]): Optional filters to apply during computation on the server. + on_execution_submitted (Optional[Callable[[Execution], None]]): Callback to call when the execution was + submitted to the backend. Returns: pandas.DataFrame: A DataFrame instance. @@ -91,6 +98,7 @@ def indexed( columns=columns, index_by=index_by, filter_by=filter_by, + on_execution_submitted=on_execution_submitted, ) _idx = make_pandas_index(index) @@ -98,7 +106,10 @@ def indexed( return pandas.DataFrame(data=data, index=_idx) def not_indexed( - self, columns: ColumnsDef, filter_by: Optional[Union[Filter, list[Filter]]] = None + self, + columns: ColumnsDef, + filter_by: Optional[Union[Filter, list[Filter]]] = None, + on_execution_submitted: Optional[Callable[[Execution], None]] = None, ) -> pandas.DataFrame: """ Creates a data frame with columns created from metrics and or labels. @@ -107,21 +118,33 @@ def not_indexed( columns (ColumnsDef): Dictionary mapping column name to its definition. filter_by (Optional[Union[Filter, list[Filter]]]): Optionally specify filters to apply during computation on the server. + on_execution_submitted (Optional[Callable[[Execution], None]]): Callback to call when the execution was + submitted to the backend. Returns: pandas.DataFrame: A DataFrame instance. """ - data, _ = compute_and_extract(self._sdk, self._workspace_id, columns=columns, filter_by=filter_by) + data, _ = compute_and_extract( + self._sdk, + self._workspace_id, + columns=columns, + filter_by=filter_by, + on_execution_submitted=on_execution_submitted, + ) return pandas.DataFrame(data=data) def for_items( - self, items: ColumnsDef, filter_by: Optional[Union[Filter, list[Filter]]] = None, auto_index: bool = True + self, + items: ColumnsDef, + filter_by: Optional[Union[Filter, list[Filter]]] = None, + auto_index: bool = True, + on_execution_submitted: Optional[Callable[[Execution], None]] = None, ) -> pandas.DataFrame: """ Creates a data frame for named items. This is a convenience method that will create DataFrame with or - without index based on the context of the items that you pass. + without an index based on the context of the items that you pass. Args: items (ColumnsDef): Dictionary mapping item name to its definition. @@ -129,6 +152,8 @@ def for_items( on the server. auto_index (bool): Default True. Enables creation of DataFrame with index depending on the contents of the items. + on_execution_submitted (Optional[Callable[[Execution], None]]): Callback to call when the execution was + submitted to the backend. Returns: pandas.DataFrame: A DataFrame instance. @@ -157,9 +182,15 @@ def for_items( index_by=resolved_attr_cols, columns=resolved_measure_cols, filter_by=filter_by, + on_execution_submitted=on_execution_submitted, ) - def for_visualization(self, visualization_id: str, auto_index: bool = True) -> pandas.DataFrame: + def for_visualization( + self, + visualization_id: str, + auto_index: bool = True, + on_execution_submitted: Optional[Callable[[Execution], None]] = None, + ) -> pandas.DataFrame: """ Creates a data frame with columns based on the content of the visualization with the provided identifier. @@ -167,6 +198,8 @@ def for_visualization(self, visualization_id: str, auto_index: bool = True) -> p visualization_id (str): Visualization identifier. auto_index (bool): Default True. Enables creation of DataFrame with index depending on the contents of the visualization. + on_execution_submitted (Optional[Callable[[Execution], None]]): Callback to call when the execution was + submitted to the backend. Returns: pandas.DataFrame: A DataFrame instance. @@ -181,22 +214,31 @@ def for_visualization(self, visualization_id: str, auto_index: bool = True) -> p **{naming.col_name_for_metric(m): m.as_computable() for m in visualization.metrics}, } - return self.for_items(columns, filter_by=filter_by, auto_index=auto_index) + return self.for_items( + columns, filter_by=filter_by, auto_index=auto_index, on_execution_submitted=on_execution_submitted + ) def for_created_visualization( - self, created_visualizations_response: dict + self, + created_visualizations_response: dict, + on_execution_submitted: Optional[Callable[[Execution], None]] = None, ) -> tuple[pandas.DataFrame, DataFrameMetadata]: """ Creates a data frame using a created visualization. Args: created_visualizations_response (dict): Created visualization response. + on_execution_submitted (Optional[Callable[[Execution], None]]): Callback to call when the execution was + submitted to the backend. Returns: pandas.DataFrame: A DataFrame instance. """ execution_definition = self._sdk.compute.build_exec_def_from_chat_result(created_visualizations_response) - return self.for_exec_def(exec_def=execution_definition) + return self.for_exec_def( + exec_def=execution_definition, + on_execution_submitted=on_execution_submitted, + ) def result_cache_metadata_for_exec_result_id(self, result_id: str) -> ResultCacheMetadata: """ @@ -217,6 +259,7 @@ def for_exec_def( result_size_dimensions_limits: ResultSizeDimensions = (), result_size_bytes_limit: Optional[int] = None, page_size: int = _DEFAULT_PAGE_SIZE, + on_execution_submitted: Optional[Callable[[Execution], None]] = None, ) -> tuple[pandas.DataFrame, DataFrameMetadata]: """ Creates a data frame using an execution definition. @@ -247,6 +290,8 @@ def for_exec_def( result_size_dimensions_limits (ResultSizeDimensions): A tuple containing maximum size of result dimensions. result_size_bytes_limit (Optional[int]): Maximum size of result in bytes. page_size (int): Number of records per page. + on_execution_submitted (Optional[Callable[[Execution], None]]): Callback to call when the execution was + submitted to the backend. Returns: Tuple[pandas.DataFrame, DataFrameMetadata]: Tuple holding DataFrame and DataFrame metadata. @@ -257,6 +302,9 @@ def for_exec_def( execution = self._sdk.compute.for_exec_def(workspace_id=self._workspace_id, exec_def=exec_def) result_cache_metadata = self.result_cache_metadata_for_exec_result_id(execution.result_id) + if on_execution_submitted is not None: + on_execution_submitted(execution) + return convert_execution_response_to_dataframe( execution_response=execution.bare_exec_response, result_cache_metadata=result_cache_metadata, @@ -302,7 +350,7 @@ def for_exec_result_id( label_overrides (Optional[LabelOverrides]): Label overrides for metrics and attributes. result_cache_metadata (Optional[ResultCacheMetadata]): Cache metadata for the execution result. result_size_dimensions_limits (ResultSizeDimensions): A tuple containing maximum size of result dimensions. - result_size_bytes_limit (Optional[int]): Maximum size of result in bytes. + result_size_bytes_limit (Optional[int]): Maximum size of the result in bytes. use_local_ids_in_headers (bool): Use local identifier in headers. use_primary_labels_in_attributes (bool): Use primary labels in attributes. page_size (int): Number of records per page. diff --git a/gooddata-pandas/gooddata_pandas/series.py b/gooddata-pandas/gooddata_pandas/series.py index 4d84e95ba..d47cc214f 100644 --- a/gooddata-pandas/gooddata_pandas/series.py +++ b/gooddata-pandas/gooddata_pandas/series.py @@ -1,10 +1,10 @@ # (C) 2021 GoodData Corporation from __future__ import annotations -from typing import Optional, Union +from typing import Callable, Optional, Union import pandas -from gooddata_sdk import Attribute, Filter, GoodDataSdk, ObjId, SimpleMetric +from gooddata_sdk import Attribute, Execution, Filter, GoodDataSdk, ObjId, SimpleMetric from gooddata_pandas.data_access import compute_and_extract from gooddata_pandas.utils import IndexDef, LabelItemDef, make_pandas_index @@ -28,6 +28,7 @@ def indexed( index_by: IndexDef, data_by: Union[SimpleMetric, str, ObjId, Attribute], filter_by: Optional[Union[Filter, list[Filter]]] = None, + on_execution_submitted: Optional[Callable[[Execution], None]] = None, ) -> pandas.Series: """Creates pandas Series from data points calculated from a single `data_by`. @@ -61,6 +62,9 @@ def indexed( - object identifier: ``ObjId(id='some_label_id', type='')`` - Attribute or Metric depending on type of filter + on_execution_submitted (Optional[Callable[[Execution], None]]): Callback to call when the execution was + submitted to the backend. + Returns: pandas.Series: pandas series instance """ @@ -71,6 +75,7 @@ def indexed( index_by=index_by, columns={"_series": data_by}, filter_by=filter_by, + on_execution_submitted=on_execution_submitted, ) _idx = make_pandas_index(index) @@ -82,6 +87,7 @@ def not_indexed( data_by: Union[SimpleMetric, str, ObjId, Attribute], granularity: Optional[Union[list[LabelItemDef], IndexDef]] = None, filter_by: Optional[Union[Filter, list[Filter]]] = None, + on_execution_submitted: Optional[Callable[[Execution], None]] = None, ) -> pandas.Series: """ Creates a pandas.Series from data points calculated from a single `data_by` without constructing an index. @@ -108,6 +114,8 @@ def not_indexed( - ObjId: ObjId(id='some_label_id', type='') - Attribute or Metric depending on the type of filter Defaults to None. + on_execution_submitted (Optional[Callable[[Execution], None]]): Callback to call when the execution was + submitted to the backend. Returns: pandas.Series: The resulting pandas Series instance. @@ -124,6 +132,7 @@ def not_indexed( index_by=_index, columns={"_series": data_by}, filter_by=filter_by, + on_execution_submitted=on_execution_submitted, ) return pandas.Series(data=data["_series"])