11# (C) 2025 GoodData Corporation
2- from dataclasses import dataclass
2+ from abc import abstractmethod
33from enum import Enum
4- from typing import Any , Iterator , TypeAlias
4+ from typing import Any , Iterator , Self , TypeAlias
55
6+ import attrs
67from gooddata_sdk .catalog .identifier import CatalogAssigneeIdentifier
78from gooddata_sdk .catalog .permission .declarative_model .permission import (
89 CatalogDeclarativeSingleWorkspacePermission ,
910 CatalogDeclarativeWorkspacePermissions ,
1011)
12+ from pydantic import BaseModel
1113
1214from gooddata_pipelines .provisioning .utils .exceptions import BaseUserException
1315
14- # TODO: refactor the full load and incremental load models to reuse as much as possible
15- # TODO: use pydantic models instead of dataclasses?
16- # TODO: make the validation logic more readable (as in PermissionIncrementalLoad)
17-
1816TargetsPermissionDict : TypeAlias = dict [str , dict [str , bool ]]
1917
2018
21- class PermissionType (Enum ):
19+ class PermissionType (str , Enum ):
2220 user = "user"
2321 user_group = "userGroup"
2422
2523
26- @dataclass (frozen = True )
27- class PermissionIncrementalLoad :
28- permission : str
29- workspace_id : str
30- id : str
31- type : PermissionType
32- is_active : bool
24+ class ConstructorMixin :
25+ @staticmethod
26+ def _get_id_and_type (
27+ permission : dict [str , Any ],
28+ ) -> tuple [str , PermissionType ]:
29+ user_id : str | None = permission .get ("user_id" )
30+ user_group_id : str | None = permission .get ("ug_id" )
31+ if user_id and user_group_id :
32+ raise ValueError ("Only one of user_id or ug_id must be present" )
33+ elif user_id :
34+ return user_id , PermissionType .user
35+ elif user_group_id :
36+ return user_group_id , PermissionType .user_group
37+ else :
38+ raise ValueError ("Either user_id or ug_id must be present" )
3339
3440 @classmethod
35- def from_list_of_dicts (
36- cls , data : list [dict [str , Any ]]
37- ) -> list ["PermissionIncrementalLoad" ]:
38- """Creates a list of User objects from list of dicts."""
39- id : str
41+ def from_list_of_dicts (cls , data : list [dict [str , Any ]]) -> list [Self ]:
42+ """Creates a list of instances from list of dicts."""
4043 permissions = []
4144 for permission in data :
42- user_id : str | None = permission .get ("user_id" )
43- user_group_id : str | None = permission .get ("ug_id" )
44-
45- if user_id is not None :
46- target_type = PermissionType .user
47- id = user_id
48- elif user_group_id is not None :
49- target_type = PermissionType .user_group
50- id = user_group_id
51-
52- permissions .append (
53- PermissionIncrementalLoad (
54- permission = permission ["ws_permissions" ],
55- workspace_id = permission ["ws_id" ],
56- id = id ,
57- type = target_type ,
58- is_active = str (permission ["is_active" ]).lower () == "true" ,
59- )
60- )
45+ permissions .append (cls .from_dict (permission ))
6146 return permissions
6247
48+ @classmethod
49+ @abstractmethod
50+ def from_dict (cls , data : dict [str , Any ]) -> Any :
51+ """Construction form a dictionary to be implemented by subclasses."""
52+ pass
53+
6354
64- @dataclass (frozen = True )
65- class PermissionFullLoad :
55+ class PermissionIncrementalLoad (BaseModel , ConstructorMixin ):
6656 permission : str
6757 workspace_id : str
68- id : str
69- type : PermissionType
58+ id_ : str
59+ type_ : PermissionType
60+ is_active : bool
7061
7162 @classmethod
72- def from_list_of_dicts (
73- cls , data : list [dict [str , Any ]]
74- ) -> list ["PermissionFullLoad" ]:
75- """Creates a list of User objects from list of dicts."""
76- permissions = []
77- for permission in data :
78- id = (
79- permission ["user_id" ]
80- if permission ["user_id" ]
81- else permission ["ug_id" ]
82- )
63+ def from_dict (cls , data : dict [str , Any ]) -> "PermissionIncrementalLoad" :
64+ """Returns an instance of PermissionIncrementalLoad from a dictionary."""
65+ id_ , target_type = cls ._get_id_and_type (data )
66+ return cls (
67+ permission = data ["ws_permissions" ],
68+ workspace_id = data ["ws_id" ],
69+ id_ = id_ ,
70+ type_ = target_type ,
71+ is_active = data ["is_active" ],
72+ )
8373
84- if permission ["user_id" ]:
85- target_type = PermissionType .user
86- else :
87- target_type = PermissionType .user_group
88-
89- permissions .append (
90- PermissionFullLoad (
91- permission = permission ["ws_permissions" ],
92- workspace_id = permission ["ws_id" ],
93- id = id ,
94- type = target_type ,
95- )
96- )
97- return permissions
9874
75+ class PermissionFullLoad (BaseModel , ConstructorMixin ):
76+ permission : str
77+ workspace_id : str
78+ id_ : str
79+ type_ : PermissionType
9980
100- @dataclass
81+ @classmethod
82+ def from_dict (cls , data : dict [str , Any ]) -> "PermissionFullLoad" :
83+ """Returns an instance of PermissionFullLoad from a dictionary."""
84+ id_ , target_type = cls ._get_id_and_type (data )
85+ return cls (
86+ permission = data ["ws_permissions" ],
87+ workspace_id = data ["ws_id" ],
88+ id_ = id_ ,
89+ type_ = target_type ,
90+ )
91+
92+
93+ @attrs .define
10194class PermissionDeclaration :
10295 users : TargetsPermissionDict
10396 user_groups : TargetsPermissionDict
@@ -192,23 +185,25 @@ def to_sdk_api(self) -> CatalogDeclarativeWorkspacePermissions:
192185 permissions = permission_declarations
193186 )
194187
195- def add_permission (self , permission : PermissionIncrementalLoad ) -> None :
188+ def add_incremental_permission (
189+ self , permission : PermissionIncrementalLoad
190+ ) -> None :
196191 """
197192 Adds WSPermission object into respective field within the instance.
198193 Handles duplicate permissions and different combinations of input
199194 and upstream is_active permission states.
200195 """
201196 target_dict = (
202197 self .users
203- if permission .type == PermissionType .user
198+ if permission .type_ == PermissionType .user
204199 else self .user_groups
205200 )
206201
207- if permission .id not in target_dict :
208- target_dict [permission .id ] = {}
202+ if permission .id_ not in target_dict :
203+ target_dict [permission .id_ ] = {}
209204
210205 is_active = permission .is_active
211- target_permissions = target_dict [permission .id ]
206+ target_permissions = target_dict [permission .id_ ]
212207 permission_value = permission .permission
213208
214209 if permission_value not in target_permissions :
@@ -225,6 +220,27 @@ def add_permission(self, permission: PermissionIncrementalLoad) -> None:
225220 )
226221 target_permissions [permission_value ] = is_active
227222
223+ def add_full_load_permission (self , permission : PermissionFullLoad ) -> None :
224+ """
225+ Adds WSPermission object into respective field within the instance.
226+ Handles duplicate permissions and different combinations of input
227+ and upstream is_active permission states.
228+ """
229+ target_dict = (
230+ self .users
231+ if permission .type_ == PermissionType .user
232+ else self .user_groups
233+ )
234+
235+ if permission .id_ not in target_dict :
236+ target_dict [permission .id_ ] = {}
237+
238+ target_permissions = target_dict [permission .id_ ]
239+ permission_value = permission .permission
240+
241+ if permission_value not in target_permissions :
242+ target_permissions [permission_value ] = True
243+
228244 def upsert (self , other : "PermissionDeclaration" ) -> None :
229245 """
230246 Modifies the owner object by merging with the other.
0 commit comments