Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,103 +1,101 @@
# (C) 2025 GoodData Corporation
from dataclasses import dataclass
from abc import abstractmethod
from enum import Enum
from typing import Any, Iterator, TypeAlias
from typing import Any, Iterator, TypeAlias, TypeVar

import attrs
from gooddata_sdk.catalog.identifier import CatalogAssigneeIdentifier
from gooddata_sdk.catalog.permission.declarative_model.permission import (
CatalogDeclarativeSingleWorkspacePermission,
CatalogDeclarativeWorkspacePermissions,
)
from pydantic import BaseModel

from gooddata_pipelines.provisioning.utils.exceptions import BaseUserException

# TODO: refactor the full load and incremental load models to reuse as much as possible
# TODO: use pydantic models instead of dataclasses?
# TODO: make the validation logic more readable (as in PermissionIncrementalLoad)

TargetsPermissionDict: TypeAlias = dict[str, dict[str, bool]]
ConstructorType = TypeVar("ConstructorType", bound="ConstructorMixin")


class PermissionType(Enum):
class PermissionType(str, Enum):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just add a comment, please, that with 3.11 we should start using StrEnum

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. Made a similar comment for the return type on L45, with 3.11 it could be made simpler with typing.Self

# NOTE: Start using StrEnum with Python 3.11
user = "user"
user_group = "userGroup"


@dataclass(frozen=True)
class PermissionIncrementalLoad:
permission: str
workspace_id: str
id: str
type: PermissionType
is_active: bool
class ConstructorMixin:
@staticmethod
def _get_id_and_type(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is there an underscore at the beginning?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a Python convention for "private" methods - it's not enforced at runtime, though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I know but the combination with @staticmethod seems a little bit strange to me.

permission: dict[str, Any],
) -> tuple[str, PermissionType]:
user_id: str | None = permission.get("user_id")
user_group_id: str | None = permission.get("ug_id")
if user_id and user_group_id:
raise ValueError("Only one of user_id or ug_id must be present")
elif user_id:
return user_id, PermissionType.user
elif user_group_id:
return user_group_id, PermissionType.user_group
else:
raise ValueError("Either user_id or ug_id must be present")

@classmethod
def from_list_of_dicts(
cls, data: list[dict[str, Any]]
) -> list["PermissionIncrementalLoad"]:
"""Creates a list of User objects from list of dicts."""
id: str
cls: type[ConstructorType], data: list[dict[str, Any]]
) -> list[ConstructorType]:
"""Creates a list of instances from list of dicts."""
# NOTE: We can use typing.Self for the return type in Python 3.11
permissions = []
for permission in data:
user_id: str | None = permission.get("user_id")
user_group_id: str | None = permission.get("ug_id")

if user_id is not None:
target_type = PermissionType.user
id = user_id
elif user_group_id is not None:
target_type = PermissionType.user_group
id = user_group_id

permissions.append(
PermissionIncrementalLoad(
permission=permission["ws_permissions"],
workspace_id=permission["ws_id"],
id=id,
type=target_type,
is_active=str(permission["is_active"]).lower() == "true",
)
)
permissions.append(cls.from_dict(permission))
return permissions

@classmethod
@abstractmethod
def from_dict(cls, data: dict[str, Any]) -> Any:
"""Construction form a dictionary to be implemented by subclasses."""
pass


@dataclass(frozen=True)
class PermissionFullLoad:
class PermissionIncrementalLoad(BaseModel, ConstructorMixin):
permission: str
workspace_id: str
id: str
type: PermissionType
id_: str
type_: PermissionType
is_active: bool

@classmethod
def from_list_of_dicts(
cls, data: list[dict[str, Any]]
) -> list["PermissionFullLoad"]:
"""Creates a list of User objects from list of dicts."""
permissions = []
for permission in data:
id = (
permission["user_id"]
if permission["user_id"]
else permission["ug_id"]
)
def from_dict(cls, data: dict[str, Any]) -> "PermissionIncrementalLoad":
"""Returns an instance of PermissionIncrementalLoad from a dictionary."""
id_, target_type = cls._get_id_and_type(data)
return cls(
permission=data["ws_permissions"],
workspace_id=data["ws_id"],
id_=id_,
type_=target_type,
is_active=data["is_active"],
)

if permission["user_id"]:
target_type = PermissionType.user
else:
target_type = PermissionType.user_group

permissions.append(
PermissionFullLoad(
permission=permission["ws_permissions"],
workspace_id=permission["ws_id"],
id=id,
type=target_type,
)
)
return permissions

class PermissionFullLoad(BaseModel, ConstructorMixin):
permission: str
workspace_id: str
id_: str
type_: PermissionType

@classmethod
def from_dict(cls, data: dict[str, Any]) -> "PermissionFullLoad":
"""Returns an instance of PermissionFullLoad from a dictionary."""
id_, target_type = cls._get_id_and_type(data)
return cls(
permission=data["ws_permissions"],
workspace_id=data["ws_id"],
id_=id_,
type_=target_type,
)

@dataclass

@attrs.define
class PermissionDeclaration:
users: TargetsPermissionDict
user_groups: TargetsPermissionDict
Expand Down Expand Up @@ -192,23 +190,25 @@ def to_sdk_api(self) -> CatalogDeclarativeWorkspacePermissions:
permissions=permission_declarations
)

def add_permission(self, permission: PermissionIncrementalLoad) -> None:
def add_incremental_permission(
self, permission: PermissionIncrementalLoad
) -> None:
"""
Adds WSPermission object into respective field within the instance.
Handles duplicate permissions and different combinations of input
and upstream is_active permission states.
"""
target_dict = (
self.users
if permission.type == PermissionType.user
if permission.type_ == PermissionType.user
else self.user_groups
)

if permission.id not in target_dict:
target_dict[permission.id] = {}
if permission.id_ not in target_dict:
target_dict[permission.id_] = {}

is_active = permission.is_active
target_permissions = target_dict[permission.id]
target_permissions = target_dict[permission.id_]
permission_value = permission.permission

if permission_value not in target_permissions:
Expand All @@ -225,6 +225,27 @@ def add_permission(self, permission: PermissionIncrementalLoad) -> None:
)
target_permissions[permission_value] = is_active

def add_full_load_permission(self, permission: PermissionFullLoad) -> None:
"""
Adds WSPermission object into respective field within the instance.
Handles duplicate permissions and different combinations of input
and upstream is_active permission states.
"""
target_dict = (
self.users
if permission.type_ == PermissionType.user
else self.user_groups
)

if permission.id_ not in target_dict:
target_dict[permission.id_] = {}

target_permissions = target_dict[permission.id_]
permission_value = permission.permission

if permission_value not in target_permissions:
target_permissions[permission_value] = True

def upsert(self, other: "PermissionDeclaration") -> None:
"""
Modifies the owner object by merging with the other.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

"""Module for provisioning user permissions in GoodData workspaces."""

from typing import TypeVar

from gooddata_pipelines.api.exceptions import GoodDataApiException
from gooddata_pipelines.provisioning.entities.users.models.permissions import (
PermissionDeclaration,
Expand All @@ -14,6 +16,11 @@
from gooddata_pipelines.provisioning.provisioning import Provisioning
from gooddata_pipelines.provisioning.utils.exceptions import BaseUserException

# Type variable for permission models (PermissionIncrementalLoad or PermissionFullLoad)
PermissionModel = TypeVar(
"PermissionModel", PermissionIncrementalLoad, PermissionFullLoad
)


class PermissionProvisioner(
Provisioning[PermissionFullLoad, PermissionIncrementalLoad]
Expand Down Expand Up @@ -72,7 +79,7 @@ def _get_upstream_declarations(

@staticmethod
def _construct_declarations(
permissions: list[PermissionIncrementalLoad],
permissions: list[PermissionIncrementalLoad] | list[PermissionFullLoad],
) -> WSPermissionsDeclarations:
"""Constructs workspace permission declarations from the input permissions."""
ws_dict: WSPermissionsDeclarations = {}
Expand All @@ -82,33 +89,40 @@ def _construct_declarations(
if ws_id not in ws_dict:
ws_dict[ws_id] = PermissionDeclaration({}, {})

ws_dict[ws_id].add_permission(permission)
if isinstance(permission, PermissionIncrementalLoad):
ws_dict[ws_id].add_incremental_permission(permission)
elif isinstance(permission, PermissionFullLoad):
ws_dict[ws_id].add_full_load_permission(permission)
else:
raise ValueError(f"Invalid permission type: {type(permission)}")
return ws_dict

def _check_user_group_exists(self, ug_id: str) -> None:
"""Checks if user group with provided ID exists."""
self._api._sdk.catalog_user.get_user_group(ug_id)

def _validate_permission(
self, permission: PermissionIncrementalLoad
self, permission: PermissionFullLoad | PermissionIncrementalLoad
) -> None:
"""Validates if the permission is correctly defined."""
if permission.type == PermissionType.user:
self._api.get_user(permission.id, error_message="User not found")
if permission.type_ == PermissionType.user:
self._api.get_user(permission.id_, error_message="User not found")
else:
self._api.get_user_group(
permission.id, error_message="User group not found"
permission.id_, error_message="User group not found"
)

self._api.get_workspace(
permission.workspace_id, error_message="Workspace not found"
)

def _filter_invalid_permissions(
self, permissions: list[PermissionIncrementalLoad]
) -> list[PermissionIncrementalLoad]:
self,
permissions: list[PermissionModel],
) -> list[PermissionModel]:
"""Filters out invalid permissions from the input list."""
valid_permissions: list[PermissionIncrementalLoad] = []
valid_permissions: list[PermissionModel] = []

for permission in permissions:
try:
self._validate_permission(permission)
Expand All @@ -121,13 +135,15 @@ def _filter_invalid_permissions(
valid_permissions.append(permission)
return valid_permissions

def _manage_permissions(
self, permissions: list[PermissionIncrementalLoad]
) -> None:
"""Manages permissions for a list of workspaces.
Modify upstream workspace declarations for each input workspace and skip non-existent ws_ids
def _provision_incremental_load(self) -> None:
"""Provisiones permissions for a list of workspaces.

Modifies existing upstream workspace permission declarations for each
input workspace and skips rest of the workspaces.
"""
valid_permissions = self._filter_invalid_permissions(permissions)
valid_permissions = self._filter_invalid_permissions(
self.source_group_incremental
)

input_declarations = self._construct_declarations(valid_permissions)

Expand All @@ -145,9 +161,21 @@ def _manage_permissions(
self._api.put_declarative_permissions(ws_id, ws_permissions)
self.logger.info(f"Updated permissions for workspace {ws_id}")

def _provision_incremental_load(self) -> None:
"""Provision permissions based on the source group."""
self._manage_permissions(self.source_group_incremental)

def _provision_full_load(self) -> None:
raise NotImplementedError("Not implemented yet.")
"""Provisions permissions for selected of workspaces.

Modifies upstream workspace declarations for each input workspace and
skips non-existent workspace ids. Overwrites any existing configuration
of the workspace permissions.
"""
valid_permissions = self._filter_invalid_permissions(
self.source_group_full
)

input_declarations = self._construct_declarations(valid_permissions)

for ws_id, declaration in input_declarations.items():
ws_permissions = declaration.to_sdk_api()

self._api.put_declarative_permissions(ws_id, ws_permissions)
self.logger.info(f"Updated permissions for workspace {ws_id}")
Loading
Loading