Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 25 additions & 19 deletions gooddata-pandas/gooddata_pandas/data_access.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
# (C) 2021 GoodData Corporation
from __future__ import annotations

from typing import Any, Optional, Union
from typing import Any, Callable, Optional, Union

from gooddata_sdk import (
Attribute,
AttributeFilter,
CatalogAttribute,
Execution,
ExecutionDefinition,
ExecutionResponse,
Filter,
GoodDataSdk,
Metric,
Expand Down Expand Up @@ -257,7 +257,7 @@ def _compute(
columns: ColumnsDef,
index_by: Optional[IndexDef] = None,
filter_by: Optional[Union[Filter, list[Filter]]] = None,
) -> tuple[ExecutionResponse, dict[str, int], dict[str, int], dict[str, int]]:
) -> tuple[Execution, dict[str, int], dict[str, int], dict[str, int]]:
"""
Internal function that computes an execution-by-convention to retrieve data for a data frame with the provided
columns, optionally indexed by the index_by label and optionally filtered.
Expand All @@ -271,7 +271,7 @@ def _compute(

Returns:
tuple: A tuple containing the following elements:
- ExecutionResponse: The execution response.
- Execution: The execution response.
- dict[str, int]: A mapping of pandas column names to attribute dimension indices.
- dict[str, int]: A mapping of pandas column names to metric dimension indices.
- dict[str, int]: A mapping of pandas index names to attribute dimension indices.
Expand Down Expand Up @@ -299,20 +299,20 @@ def _compute(
#


def _extract_for_metrics_only(response: ExecutionResponse, cols: list, col_to_metric_idx: dict) -> dict:
def _extract_for_metrics_only(execution: Execution, cols: list, col_to_metric_idx: dict) -> dict:
"""
Internal function that extracts data for metrics-only columns when there are no attribute columns.

Args:
response (ExecutionResponse): The execution response to extract data from.
execution (Execution): The execution response to extract data from.
cols (list): A list of column names.
col_to_metric_idx (dict): A mapping of pandas column names to metric dimension indices.

Returns:
dict: A dictionary containing the extracted data.
"""
exec_def = response.exec_def
result = response.read_result(len(exec_def.metrics))
exec_def = execution.exec_def
result = execution.read_result(len(exec_def.metrics))
if len(result.data) == 0:
return {col: [] for col in cols}

Expand Down Expand Up @@ -345,7 +345,7 @@ def _typed_result(attributes: list[CatalogAttribute], attribute: Attribute, resu


def _extract_from_attributes_and_maybe_metrics(
response: ExecutionResponse,
execution: Execution,
attributes: list[CatalogAttribute],
cols: list[str],
col_to_attr_idx: dict[str, int],
Expand All @@ -357,7 +357,7 @@ def _extract_from_attributes_and_maybe_metrics(
optionally metrics columns.

Args:
response (ExecutionResponse): The execution response to extract data from.
execution (Execution): The execution response to extract data from.
attributes (list[CatalogAttribute]): The catalog of attributes.
cols (list[str]): A list of column names.
col_to_attr_idx (dict[str, int]): A mapping of pandas column names to attribute dimension indices.
Expand All @@ -370,11 +370,11 @@ def _extract_from_attributes_and_maybe_metrics(
- dict: A dictionary containing the extracted data.
- dict: A dictionary containing the extracted index data.
"""
exec_def = response.exec_def
exec_def = execution.exec_def
offset = [0 for _ in exec_def.dimensions]
limit = [len(exec_def.metrics), _RESULT_PAGE_LEN] if exec_def.has_metrics() else [_RESULT_PAGE_LEN]
attribute_dim = 1 if exec_def.has_metrics() else 0
result = response.read_result(limit=limit, offset=offset)
result = execution.read_result(limit=limit, offset=offset)
safe_index_to_attr_idx = index_to_attr_idx if index_to_attr_idx is not None else dict()

# mappings from column name to Attribute
Expand All @@ -401,7 +401,7 @@ def _extract_from_attributes_and_maybe_metrics(
break

offset[attribute_dim] = result.next_page_start(attribute_dim)
result = response.read_result(limit=limit, offset=offset)
result = execution.read_result(limit=limit, offset=offset)

return data, index

Expand All @@ -412,6 +412,7 @@ def compute_and_extract(
columns: ColumnsDef,
index_by: Optional[IndexDef] = None,
filter_by: Optional[Union[Filter, list[Filter]]] = None,
on_execution_submitted: Optional[Callable[[Execution], None]] = None,
) -> tuple[dict, dict]:
"""
Convenience function that computes and extracts data from the execution response.
Expand All @@ -422,14 +423,16 @@ def compute_and_extract(
columns (ColumnsDef): The columns definition.
index_by (Optional[IndexDef]): The index definition, if any.
filter_by (Optional[Union[Filter, list[Filter]]]): A filter or a list of filters, if any.
on_execution_submitted (Optional[Callable[[Execution], None]]): Callback to call when the execution was
submitted to the backend.

Returns:
tuple: A tuple containing the following dictionaries:
- dict: A dictionary with data for each column in `columns`.
- dict: A dictionary with data for constructing index(es) for each index in index_by.

Note: For convenience it is possible to pass just single index. in that case the index dict will contain exactly
one key of '0' (just get first value from dict when consuming the result).
Note: For convenience, it is possible to pass just a single index. In that case, the index dict will contain exactly
one key of '0' (just get the first value from dict when consuming the result).
"""
result = _compute(
sdk=sdk,
Expand All @@ -439,17 +442,20 @@ def compute_and_extract(
filter_by=filter_by,
)

response, col_to_attr_idx, col_to_metric_idx, index_to_attr_idx = result
execution, col_to_attr_idx, col_to_metric_idx, index_to_attr_idx = result

exec_def = response.exec_def
if on_execution_submitted is not None:
on_execution_submitted(execution)

exec_def = execution.exec_def
cols = list(columns.keys())

if not exec_def.has_attributes():
return _extract_for_metrics_only(response, cols, col_to_metric_idx), dict()
return _extract_for_metrics_only(execution, cols, col_to_metric_idx), dict()
else:
attributes = get_catalog_attributes_for_extract(sdk, workspace_id, exec_def.attributes)
return _extract_from_attributes_and_maybe_metrics(
response,
execution,
attributes,
cols,
col_to_attr_idx,
Expand Down
72 changes: 60 additions & 12 deletions gooddata-pandas/gooddata_pandas/dataframe.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# (C) 2021 GoodData Corporation
from __future__ import annotations

from typing import Optional, Union
from typing import Callable, Optional, Union

import pandas
from gooddata_api_client import models
from gooddata_sdk import (
Attribute,
BareExecutionResponse,
Execution,
ExecutionDefinition,
Filter,
GoodDataSdk,
Expand Down Expand Up @@ -68,19 +69,25 @@ def __init__(self, sdk: GoodDataSdk, workspace_id: str) -> None:
self._workspace_id = workspace_id

def indexed(
self, index_by: IndexDef, columns: ColumnsDef, filter_by: Optional[Union[Filter, list[Filter]]] = None
self,
index_by: IndexDef,
columns: ColumnsDef,
filter_by: Optional[Union[Filter, list[Filter]]] = None,
on_execution_submitted: Optional[Callable[[Execution], None]] = None,
) -> pandas.DataFrame:
"""
Creates a data frame indexed by values of the label. The data frame columns will be created from either
metrics or other label values.

Note that depending on composition of the labels, the DataFrame's index may or may not be unique.
Note that depending on the composition of the labels, the DataFrame's index may or may not be unique.

Args:
index_by (IndexDef): One or more labels to index by.
columns (ColumnsDef): Dictionary mapping column name to its definition.
filter_by (Optional[Union[Filter, list[Filter]]]):
Optional filters to apply during computation on the server.
on_execution_submitted (Optional[Callable[[Execution], None]]): Callback to call when the execution was
submitted to the backend.

Returns:
pandas.DataFrame: A DataFrame instance.
Expand All @@ -91,14 +98,18 @@ def indexed(
columns=columns,
index_by=index_by,
filter_by=filter_by,
on_execution_submitted=on_execution_submitted,
)

_idx = make_pandas_index(index)

return pandas.DataFrame(data=data, index=_idx)

def not_indexed(
self, columns: ColumnsDef, filter_by: Optional[Union[Filter, list[Filter]]] = None
self,
columns: ColumnsDef,
filter_by: Optional[Union[Filter, list[Filter]]] = None,
on_execution_submitted: Optional[Callable[[Execution], None]] = None,
) -> pandas.DataFrame:
"""
Creates a data frame with columns created from metrics and or labels.
Expand All @@ -107,28 +118,42 @@ def not_indexed(
columns (ColumnsDef): Dictionary mapping column name to its definition.
filter_by (Optional[Union[Filter, list[Filter]]]): Optionally specify filters to apply during
computation on the server.
on_execution_submitted (Optional[Callable[[Execution], None]]): Callback to call when the execution was
submitted to the backend.

Returns:
pandas.DataFrame: A DataFrame instance.
"""

data, _ = compute_and_extract(self._sdk, self._workspace_id, columns=columns, filter_by=filter_by)
data, _ = compute_and_extract(
self._sdk,
self._workspace_id,
columns=columns,
filter_by=filter_by,
on_execution_submitted=on_execution_submitted,
)

return pandas.DataFrame(data=data)

def for_items(
self, items: ColumnsDef, filter_by: Optional[Union[Filter, list[Filter]]] = None, auto_index: bool = True
self,
items: ColumnsDef,
filter_by: Optional[Union[Filter, list[Filter]]] = None,
auto_index: bool = True,
on_execution_submitted: Optional[Callable[[Execution], None]] = None,
) -> pandas.DataFrame:
"""
Creates a data frame for named items. This is a convenience method that will create DataFrame with or
without index based on the context of the items that you pass.
without an index based on the context of the items that you pass.

Args:
items (ColumnsDef): Dictionary mapping item name to its definition.
filter_by (Optional[Union[Filter, list[Filter]]]): Optionally specify filters to apply during computation
on the server.
auto_index (bool): Default True. Enables creation of DataFrame with index depending on the contents
of the items.
on_execution_submitted (Optional[Callable[[Execution], None]]): Callback to call when the execution was
submitted to the backend.

Returns:
pandas.DataFrame: A DataFrame instance.
Expand Down Expand Up @@ -157,16 +182,24 @@ def for_items(
index_by=resolved_attr_cols,
columns=resolved_measure_cols,
filter_by=filter_by,
on_execution_submitted=on_execution_submitted,
)

def for_visualization(self, visualization_id: str, auto_index: bool = True) -> pandas.DataFrame:
def for_visualization(
self,
visualization_id: str,
auto_index: bool = True,
on_execution_submitted: Optional[Callable[[Execution], None]] = None,
) -> pandas.DataFrame:
"""
Creates a data frame with columns based on the content of the visualization with the provided identifier.

Args:
visualization_id (str): Visualization identifier.
auto_index (bool): Default True. Enables creation of DataFrame with index depending on the contents
of the visualization.
on_execution_submitted (Optional[Callable[[Execution], None]]): Callback to call when the execution was
submitted to the backend.

Returns:
pandas.DataFrame: A DataFrame instance.
Expand All @@ -181,22 +214,31 @@ def for_visualization(self, visualization_id: str, auto_index: bool = True) -> p
**{naming.col_name_for_metric(m): m.as_computable() for m in visualization.metrics},
}

return self.for_items(columns, filter_by=filter_by, auto_index=auto_index)
return self.for_items(
columns, filter_by=filter_by, auto_index=auto_index, on_execution_submitted=on_execution_submitted
)

def for_created_visualization(
self, created_visualizations_response: dict
self,
created_visualizations_response: dict,
on_execution_submitted: Optional[Callable[[Execution], None]] = None,
) -> tuple[pandas.DataFrame, DataFrameMetadata]:
"""
Creates a data frame using a created visualization.

Args:
created_visualizations_response (dict): Created visualization response.
on_execution_submitted (Optional[Callable[[Execution], None]]): Callback to call when the execution was
submitted to the backend.

Returns:
pandas.DataFrame: A DataFrame instance.
"""
execution_definition = self._sdk.compute.build_exec_def_from_chat_result(created_visualizations_response)
return self.for_exec_def(exec_def=execution_definition)
return self.for_exec_def(
exec_def=execution_definition,
on_execution_submitted=on_execution_submitted,
)

def result_cache_metadata_for_exec_result_id(self, result_id: str) -> ResultCacheMetadata:
"""
Expand All @@ -217,6 +259,7 @@ def for_exec_def(
result_size_dimensions_limits: ResultSizeDimensions = (),
result_size_bytes_limit: Optional[int] = None,
page_size: int = _DEFAULT_PAGE_SIZE,
on_execution_submitted: Optional[Callable[[Execution], None]] = None,
) -> tuple[pandas.DataFrame, DataFrameMetadata]:
"""
Creates a data frame using an execution definition.
Expand Down Expand Up @@ -247,6 +290,8 @@ def for_exec_def(
result_size_dimensions_limits (ResultSizeDimensions): A tuple containing maximum size of result dimensions.
result_size_bytes_limit (Optional[int]): Maximum size of result in bytes.
page_size (int): Number of records per page.
on_execution_submitted (Optional[Callable[[Execution], None]]): Callback to call when the execution was
submitted to the backend.

Returns:
Tuple[pandas.DataFrame, DataFrameMetadata]: Tuple holding DataFrame and DataFrame metadata.
Expand All @@ -257,6 +302,9 @@ def for_exec_def(
execution = self._sdk.compute.for_exec_def(workspace_id=self._workspace_id, exec_def=exec_def)
result_cache_metadata = self.result_cache_metadata_for_exec_result_id(execution.result_id)

if on_execution_submitted is not None:
on_execution_submitted(execution)

return convert_execution_response_to_dataframe(
execution_response=execution.bare_exec_response,
result_cache_metadata=result_cache_metadata,
Expand Down Expand Up @@ -302,7 +350,7 @@ def for_exec_result_id(
label_overrides (Optional[LabelOverrides]): Label overrides for metrics and attributes.
result_cache_metadata (Optional[ResultCacheMetadata]): Cache metadata for the execution result.
result_size_dimensions_limits (ResultSizeDimensions): A tuple containing maximum size of result dimensions.
result_size_bytes_limit (Optional[int]): Maximum size of result in bytes.
result_size_bytes_limit (Optional[int]): Maximum size of the result in bytes.
use_local_ids_in_headers (bool): Use local identifier in headers.
use_primary_labels_in_attributes (bool): Use primary labels in attributes.
page_size (int): Number of records per page.
Expand Down
Loading
Loading