-
Notifications
You must be signed in to change notification settings - Fork 60
SVS 1196 - Full load logic for permissions provisioning #1120
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,103 +1,101 @@ | ||
| # (C) 2025 GoodData Corporation | ||
| from dataclasses import dataclass | ||
| from abc import abstractmethod | ||
| from enum import Enum | ||
| from typing import Any, Iterator, TypeAlias | ||
| from typing import Any, Iterator, TypeAlias, TypeVar | ||
|
|
||
| import attrs | ||
| from gooddata_sdk.catalog.identifier import CatalogAssigneeIdentifier | ||
| from gooddata_sdk.catalog.permission.declarative_model.permission import ( | ||
| CatalogDeclarativeSingleWorkspacePermission, | ||
| CatalogDeclarativeWorkspacePermissions, | ||
| ) | ||
| from pydantic import BaseModel | ||
|
|
||
| from gooddata_pipelines.provisioning.utils.exceptions import BaseUserException | ||
|
|
||
| # TODO: refactor the full load and incremental load models to reuse as much as possible | ||
| # TODO: use pydantic models instead of dataclasses? | ||
| # TODO: make the validation logic more readable (as in PermissionIncrementalLoad) | ||
|
|
||
| TargetsPermissionDict: TypeAlias = dict[str, dict[str, bool]] | ||
| ConstructorType = TypeVar("ConstructorType", bound="ConstructorMixin") | ||
|
|
||
|
|
||
| class PermissionType(Enum): | ||
| class PermissionType(str, Enum): | ||
| # NOTE: Start using StrEnum with Python 3.11 | ||
| user = "user" | ||
| user_group = "userGroup" | ||
|
|
||
|
|
||
| @dataclass(frozen=True) | ||
| class PermissionIncrementalLoad: | ||
| permission: str | ||
| workspace_id: str | ||
| id: str | ||
| type: PermissionType | ||
| is_active: bool | ||
| class ConstructorMixin: | ||
| @staticmethod | ||
| def _get_id_and_type( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is there an underscore at the beginning?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a Python convention for "private" methods - it's not enforced at runtime, though.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I know but the combination with |
||
| permission: dict[str, Any], | ||
| ) -> tuple[str, PermissionType]: | ||
| user_id: str | None = permission.get("user_id") | ||
| user_group_id: str | None = permission.get("ug_id") | ||
| if user_id and user_group_id: | ||
| raise ValueError("Only one of user_id or ug_id must be present") | ||
| elif user_id: | ||
| return user_id, PermissionType.user | ||
| elif user_group_id: | ||
| return user_group_id, PermissionType.user_group | ||
| else: | ||
| raise ValueError("Either user_id or ug_id must be present") | ||
|
|
||
| @classmethod | ||
| def from_list_of_dicts( | ||
| cls, data: list[dict[str, Any]] | ||
| ) -> list["PermissionIncrementalLoad"]: | ||
| """Creates a list of User objects from list of dicts.""" | ||
| id: str | ||
| cls: type[ConstructorType], data: list[dict[str, Any]] | ||
| ) -> list[ConstructorType]: | ||
| """Creates a list of instances from list of dicts.""" | ||
| # NOTE: We can use typing.Self for the return type in Python 3.11 | ||
| permissions = [] | ||
| for permission in data: | ||
| user_id: str | None = permission.get("user_id") | ||
| user_group_id: str | None = permission.get("ug_id") | ||
|
|
||
| if user_id is not None: | ||
| target_type = PermissionType.user | ||
| id = user_id | ||
| elif user_group_id is not None: | ||
| target_type = PermissionType.user_group | ||
| id = user_group_id | ||
|
|
||
| permissions.append( | ||
| PermissionIncrementalLoad( | ||
| permission=permission["ws_permissions"], | ||
| workspace_id=permission["ws_id"], | ||
| id=id, | ||
| type=target_type, | ||
| is_active=str(permission["is_active"]).lower() == "true", | ||
| ) | ||
| ) | ||
| permissions.append(cls.from_dict(permission)) | ||
| return permissions | ||
|
|
||
| @classmethod | ||
| @abstractmethod | ||
| def from_dict(cls, data: dict[str, Any]) -> Any: | ||
| """Construction form a dictionary to be implemented by subclasses.""" | ||
| pass | ||
|
|
||
|
|
||
| @dataclass(frozen=True) | ||
| class PermissionFullLoad: | ||
| class PermissionIncrementalLoad(BaseModel, ConstructorMixin): | ||
| permission: str | ||
| workspace_id: str | ||
| id: str | ||
| type: PermissionType | ||
| id_: str | ||
| type_: PermissionType | ||
| is_active: bool | ||
|
|
||
| @classmethod | ||
| def from_list_of_dicts( | ||
| cls, data: list[dict[str, Any]] | ||
| ) -> list["PermissionFullLoad"]: | ||
| """Creates a list of User objects from list of dicts.""" | ||
| permissions = [] | ||
| for permission in data: | ||
| id = ( | ||
| permission["user_id"] | ||
| if permission["user_id"] | ||
| else permission["ug_id"] | ||
| ) | ||
| def from_dict(cls, data: dict[str, Any]) -> "PermissionIncrementalLoad": | ||
| """Returns an instance of PermissionIncrementalLoad from a dictionary.""" | ||
| id_, target_type = cls._get_id_and_type(data) | ||
| return cls( | ||
| permission=data["ws_permissions"], | ||
| workspace_id=data["ws_id"], | ||
| id_=id_, | ||
| type_=target_type, | ||
| is_active=data["is_active"], | ||
| ) | ||
|
|
||
| if permission["user_id"]: | ||
| target_type = PermissionType.user | ||
| else: | ||
| target_type = PermissionType.user_group | ||
|
|
||
| permissions.append( | ||
| PermissionFullLoad( | ||
| permission=permission["ws_permissions"], | ||
| workspace_id=permission["ws_id"], | ||
| id=id, | ||
| type=target_type, | ||
| ) | ||
| ) | ||
| return permissions | ||
|
|
||
| class PermissionFullLoad(BaseModel, ConstructorMixin): | ||
| permission: str | ||
| workspace_id: str | ||
| id_: str | ||
| type_: PermissionType | ||
|
|
||
| @classmethod | ||
| def from_dict(cls, data: dict[str, Any]) -> "PermissionFullLoad": | ||
| """Returns an instance of PermissionFullLoad from a dictionary.""" | ||
| id_, target_type = cls._get_id_and_type(data) | ||
| return cls( | ||
| permission=data["ws_permissions"], | ||
| workspace_id=data["ws_id"], | ||
| id_=id_, | ||
| type_=target_type, | ||
| ) | ||
|
|
||
| @dataclass | ||
|
|
||
| @attrs.define | ||
| class PermissionDeclaration: | ||
| users: TargetsPermissionDict | ||
| user_groups: TargetsPermissionDict | ||
|
|
@@ -192,23 +190,25 @@ def to_sdk_api(self) -> CatalogDeclarativeWorkspacePermissions: | |
| permissions=permission_declarations | ||
| ) | ||
|
|
||
| def add_permission(self, permission: PermissionIncrementalLoad) -> None: | ||
| def add_incremental_permission( | ||
| self, permission: PermissionIncrementalLoad | ||
| ) -> None: | ||
| """ | ||
| Adds WSPermission object into respective field within the instance. | ||
| Handles duplicate permissions and different combinations of input | ||
| and upstream is_active permission states. | ||
| """ | ||
| target_dict = ( | ||
| self.users | ||
| if permission.type == PermissionType.user | ||
| if permission.type_ == PermissionType.user | ||
| else self.user_groups | ||
| ) | ||
|
|
||
| if permission.id not in target_dict: | ||
| target_dict[permission.id] = {} | ||
| if permission.id_ not in target_dict: | ||
| target_dict[permission.id_] = {} | ||
|
|
||
| is_active = permission.is_active | ||
| target_permissions = target_dict[permission.id] | ||
| target_permissions = target_dict[permission.id_] | ||
| permission_value = permission.permission | ||
|
|
||
| if permission_value not in target_permissions: | ||
|
|
@@ -225,6 +225,27 @@ def add_permission(self, permission: PermissionIncrementalLoad) -> None: | |
| ) | ||
| target_permissions[permission_value] = is_active | ||
|
|
||
| def add_full_load_permission(self, permission: PermissionFullLoad) -> None: | ||
| """ | ||
| Adds WSPermission object into respective field within the instance. | ||
| Handles duplicate permissions and different combinations of input | ||
| and upstream is_active permission states. | ||
| """ | ||
| target_dict = ( | ||
| self.users | ||
| if permission.type_ == PermissionType.user | ||
| else self.user_groups | ||
| ) | ||
|
|
||
| if permission.id_ not in target_dict: | ||
| target_dict[permission.id_] = {} | ||
|
|
||
| target_permissions = target_dict[permission.id_] | ||
| permission_value = permission.permission | ||
|
|
||
| if permission_value not in target_permissions: | ||
| target_permissions[permission_value] = True | ||
|
|
||
| def upsert(self, other: "PermissionDeclaration") -> None: | ||
| """ | ||
| Modifies the owner object by merging with the other. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just add a comment, please, that with 3.11 we should start using StrEnum
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup. Made a similar comment for the return type on L45, with 3.11 it could be made simpler with typing.Self