Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gooddata-pipelines/TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ A list of outstanding tasks, features, or technical debt to be addressed in this

- [ ] Integrate with GoodDataApiClient
- [ ] Consider replacing the SdkMethods wrapper with direct calls to the SDK methods
- [ ] Consider using orjson library instead of json
- [ ] Consider using orjson library instead of json to load test data
- [ ] Cleanup custom exceptions
- [ ] Improve test coverage. Write missing unit tests for legacy code (e.g., user data filters)

Expand Down
8 changes: 7 additions & 1 deletion gooddata-pipelines/gooddata_pipelines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
UserDataFilterProvisioner,
)
from .provisioning.entities.users.models.permissions import (
EntityType,
PermissionFullLoad,
PermissionIncrementalLoad,
)
Expand All @@ -33,7 +34,10 @@
from .provisioning.entities.users.permissions import PermissionProvisioner
from .provisioning.entities.users.user_groups import UserGroupProvisioner
from .provisioning.entities.users.users import UserProvisioner
from .provisioning.entities.workspaces.models import WorkspaceFullLoad
from .provisioning.entities.workspaces.models import (
WorkspaceFullLoad,
WorkspaceIncrementalLoad,
)
from .provisioning.entities.workspaces.workspace import WorkspaceProvisioner

__all__ = [
Expand All @@ -52,8 +56,10 @@
"UserGroupFullLoad",
"UserProvisioner",
"UserGroupProvisioner",
"WorkspaceIncrementalLoad",
"PermissionProvisioner",
"UserDataFilterProvisioner",
"UserDataFilterFullLoad",
"EntityType",
"__version__",
]
54 changes: 0 additions & 54 deletions gooddata-pipelines/gooddata_pipelines/api/gooddata_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@

import requests

# TODO: Limit the use of "typing.Any". Improve readability by using either models
# or typed dicts.

TIMEOUT = 60
REQUEST_PAGE_SIZE = 250
API_VERSION = "v1"
Expand Down Expand Up @@ -55,42 +52,6 @@ def _get_url(self, endpoint: str) -> str:
"""
return f"{self.base_url}{endpoint}"

def get_custom_application_setting(
self, workspace_id: str, setting_id: str
) -> requests.Response:
"""Gets a custom application setting.

Args:
workspace_id (str): The ID of the workspace.
setting_id (str): The ID of the custom application setting.
Returns:
requests.Response: The response from the server containing the
custom application setting.
"""
url = f"/entities/workspaces/{workspace_id}/customApplicationSettings/{setting_id}"
return self._get(url)

def put_custom_application_setting(
self, workspace_id: str, setting_id: str, data: dict[str, Any]
) -> requests.Response:
url = f"/entities/workspaces/{workspace_id}/customApplicationSettings/{setting_id}"
return self._put(url, data, self.headers)

def post_custom_application_setting(
self, workspace_id: str, data: dict[str, Any]
) -> requests.Response:
"""Creates a custom application setting for a given workspace.

Args:
workspace_id (str): The ID of the workspace.
data (dict[str, Any]): The data for the custom application setting.
Returns:
requests.Response: The response from the server containing the
created custom application setting.
"""
url = f"/entities/workspaces/{workspace_id}/customApplicationSettings/"
return self._post(url, data, self.headers)

def get_all_workspace_data_filters(
self, workspace_id: str
) -> requests.Response:
Expand Down Expand Up @@ -201,21 +162,6 @@ def delete_workspace_data_filter_setting(
endpoint,
)

def post_workspace_data_filter(
self, workspace_id: str, data: dict[str, Any]
) -> requests.Response:
"""Creates a workspace data filter for a given workspace.

Args:
workspace_id (str): The ID of the workspace.
data (dict[str, Any]): The data for the workspace data filter.
Returns:
requests.Response: The response from the server containing the
created workspace data filter.
"""
endpoint = f"/entities/workspaces/{workspace_id}/workspaceDataFilters"
return self._post(endpoint, data, self.headers)

def get_user_data_filters(self, workspace_id: str) -> requests.Response:
"""Gets the user data filters for a given workspace."""
endpoint = f"/layout/workspaces/{workspace_id}/userDataFilters"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# (C) 2025 GoodData Corporation
from abc import abstractmethod

from enum import Enum
from typing import Any, Iterator, TypeAlias, TypeVar
from typing import Iterator, TypeAlias

import attrs
from gooddata_sdk.catalog.identifier import CatalogAssigneeIdentifier
Expand All @@ -14,85 +14,29 @@
from gooddata_pipelines.provisioning.utils.exceptions import BaseUserException

TargetsPermissionDict: TypeAlias = dict[str, dict[str, bool]]
ConstructorType = TypeVar("ConstructorType", bound="ConstructorMixin")


class PermissionType(str, Enum):
class EntityType(str, Enum):
# NOTE: Start using StrEnum with Python 3.11
user = "user"
user_group = "userGroup"


class ConstructorMixin:
@staticmethod
def _get_id_and_type(
permission: dict[str, Any],
) -> tuple[str, PermissionType]:
user_id: str | None = permission.get("user_id")
user_group_id: str | None = permission.get("ug_id")
if user_id and user_group_id:
raise ValueError("Only one of user_id or ug_id must be present")
elif user_id:
return user_id, PermissionType.user
elif user_group_id:
return user_group_id, PermissionType.user_group
else:
raise ValueError("Either user_id or ug_id must be present")

@classmethod
def from_list_of_dicts(
cls: type[ConstructorType], data: list[dict[str, Any]]
) -> list[ConstructorType]:
"""Creates a list of instances from list of dicts."""
# NOTE: We can use typing.Self for the return type in Python 3.11
permissions = []
for permission in data:
permissions.append(cls.from_dict(permission))
return permissions

@classmethod
@abstractmethod
def from_dict(cls, data: dict[str, Any]) -> Any:
"""Construction form a dictionary to be implemented by subclasses."""
pass


class PermissionIncrementalLoad(BaseModel, ConstructorMixin):
class BasePermission(BaseModel):
permission: str
workspace_id: str
id_: str
type_: PermissionType
is_active: bool
entity_id: str
entity_type: EntityType

@classmethod
def from_dict(cls, data: dict[str, Any]) -> "PermissionIncrementalLoad":
"""Returns an instance of PermissionIncrementalLoad from a dictionary."""
id_, target_type = cls._get_id_and_type(data)
return cls(
permission=data["ws_permissions"],
workspace_id=data["ws_id"],
id_=id_,
type_=target_type,
is_active=data["is_active"],
)

class PermissionFullLoad(BasePermission):
"""Input validator for full load of workspace permissions provisioning."""

class PermissionFullLoad(BaseModel, ConstructorMixin):
permission: str
workspace_id: str
id_: str
type_: PermissionType

@classmethod
def from_dict(cls, data: dict[str, Any]) -> "PermissionFullLoad":
"""Returns an instance of PermissionFullLoad from a dictionary."""
id_, target_type = cls._get_id_and_type(data)
return cls(
permission=data["ws_permissions"],
workspace_id=data["ws_id"],
id_=id_,
type_=target_type,
)
class PermissionIncrementalLoad(BasePermission):
"""Input validator for incremental load of workspace permissions provisioning."""

is_active: bool


@attrs.define
Expand All @@ -117,7 +61,7 @@ def from_sdk_api(
permission.assignee.id,
)

if permission_type == PermissionType.user.value:
if permission_type == EntityType.user.value:
target_dict = users
else:
target_dict = user_groups
Expand Down Expand Up @@ -170,7 +114,7 @@ def to_sdk_api(self) -> CatalogDeclarativeWorkspacePermissions:

for user_id, permissions in self.users.items():
assignee = CatalogAssigneeIdentifier(
id=user_id, type=PermissionType.user.value
id=user_id, type=EntityType.user.value
)
for declaration in self._permissions_for_target(
permissions, assignee
Expand All @@ -179,7 +123,7 @@ def to_sdk_api(self) -> CatalogDeclarativeWorkspacePermissions:

for ug_id, permissions in self.user_groups.items():
assignee = CatalogAssigneeIdentifier(
id=ug_id, type=PermissionType.user_group.value
id=ug_id, type=EntityType.user_group.value
)
for declaration in self._permissions_for_target(
permissions, assignee
Expand All @@ -200,15 +144,15 @@ def add_incremental_permission(
"""
target_dict = (
self.users
if permission.type_ == PermissionType.user
if permission.entity_type == EntityType.user
else self.user_groups
)

if permission.id_ not in target_dict:
target_dict[permission.id_] = {}
if permission.entity_id not in target_dict:
target_dict[permission.entity_id] = {}

is_active = permission.is_active
target_permissions = target_dict[permission.id_]
target_permissions = target_dict[permission.entity_id]
permission_value = permission.permission

if permission_value not in target_permissions:
Expand All @@ -233,14 +177,14 @@ def add_full_load_permission(self, permission: PermissionFullLoad) -> None:
"""
target_dict = (
self.users
if permission.type_ == PermissionType.user
if permission.entity_type == EntityType.user
else self.user_groups
)

if permission.id_ not in target_dict:
target_dict[permission.id_] = {}
if permission.entity_id not in target_dict:
target_dict[permission.entity_id] = {}

target_permissions = target_dict[permission.id_]
target_permissions = target_dict[permission.entity_id]
permission_value = permission.permission

if permission_value not in target_permissions:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,64 +1,37 @@
# (C) 2025 GoodData Corporation

from typing import Any
from pydantic import BaseModel, Field, ValidationInfo, field_validator

from pydantic import BaseModel

from gooddata_pipelines.provisioning.utils.utils import SplitMixin


class BaseUserGroup(BaseModel, SplitMixin):
class UserGroupBase(BaseModel):
user_group_id: str
user_group_name: str
parent_user_groups: list[str]
parent_user_groups: list[str] = Field(default_factory=list)

@field_validator("user_group_name", mode="before")
@classmethod
def _create_from_dict_data(
cls, user_group_data: dict[str, Any], delimiter: str = ","
) -> dict[str, Any]:
"""Helper method to extract common data from dict."""
parent_user_groups = cls.split(
user_group_data["parent_user_groups"], delimiter=delimiter
)
user_group_name = user_group_data["user_group_name"]
if not user_group_name:
user_group_name = user_group_data["user_group_id"]

return {
"user_group_id": user_group_data["user_group_id"],
"user_group_name": user_group_name,
"parent_user_groups": parent_user_groups,
}


class UserGroupIncrementalLoad(BaseUserGroup):
is_active: bool

def validate_user_group_name(
cls, v: str | None, info: ValidationInfo
) -> str:
"""If user_group_name is None or empty, default to user_group_id."""
if not v: # handles None and empty string
return info.data.get("user_group_id", "")
return v

@field_validator("parent_user_groups", mode="before")
@classmethod
def from_list_of_dicts(
cls, data: list[dict[str, Any]], delimiter: str = ","
) -> list["UserGroupIncrementalLoad"]:
"""Creates a list of User objects from list of dicts."""
user_groups = []
for user_group in data:
base_data = cls._create_from_dict_data(user_group, delimiter)
base_data["is_active"] = user_group["is_active"]
def validate_parent_user_groups(cls, v: list[str] | None) -> list[str]:
"""If parent_user_groups is None or empty, default to empty list."""
if not v:
return []
return v

user_groups.append(UserGroupIncrementalLoad(**base_data))

return user_groups
class UserGroupFullLoad(UserGroupBase):
"""Input validator for full load of user group provisioning."""


class UserGroupFullLoad(BaseUserGroup):
@classmethod
def from_list_of_dicts(
cls, data: list[dict[str, Any]], delimiter: str = ","
) -> list["UserGroupFullLoad"]:
"""Creates a list of User objects from list of dicts."""
user_groups = []
for user_group in data:
base_data = cls._create_from_dict_data(user_group, delimiter)
class UserGroupIncrementalLoad(UserGroupBase):
"""Input validator for incremental load of user group provisioning."""

user_groups.append(UserGroupFullLoad(**base_data))

return user_groups
is_active: bool
Loading
Loading