diff --git a/dojo/api_v2/permissions.py b/dojo/api_v2/permissions.py index 905ddf99b58..86c8d93ad08 100644 --- a/dojo/api_v2/permissions.py +++ b/dojo/api_v2/permissions.py @@ -1,4 +1,3 @@ -import re from django.db.models import Model from django.shortcuts import get_object_or_404 @@ -20,6 +19,7 @@ from dojo.importers.auto_create_context import AutoCreateContextManager from dojo.models import ( Cred_Mapping, + Development_Environment, Dojo_Group, Endpoint, Engagement, @@ -27,6 +27,8 @@ Finding_Group, Product, Product_Type, + Regulation, + SLA_Configuration, Test, ) @@ -60,6 +62,72 @@ def check_object_permission( return False +class BaseRelatedObjectPermission(permissions.BasePermission): + + """ + An "abstract" base class for related object permissions (like notes, metadata, etc.) + that only need object permissions, not general permissions. This class will serve as + the base class for other more aptly named permission classes. + """ + + permission_map: dict[str, int] = { + "get_permission": None, + "put_permission": None, + "delete_permission": None, + "post_permission": None, + } + + def has_permission(self, request: Request, view): + # related object only need object permission + return True + + def has_object_permission(self, request: Request, view, obj): + return check_object_permission( + request, + obj, + **self.permission_map, + ) + + +class BaseDjangoModelPermission(permissions.BasePermission): + + """ + An "abstract" base class for Django model permissions. + This class will serve as the base class for other more aptly named permission classes. + """ + + django_model: Model = None + request_method_permission_map: dict[str, str] = { + "GET": "view", + "POST": "add", + "PUT": "change", + "PATCH": "change", + "DELETE": "delete", + } + + def _evaluate_permissions(self, request: Request, permissions: dict[str, str]) -> bool: + # Short circuit if the request method is not in the expected methods + if request.method not in permissions: + return True + # Evaluate the permissions as usual + for method, permission in permissions.items(): + if request.method == method: + return user_has_configuration_permission( + request.user, + f"{self.django_model._meta.app_label}.{permission}_{self.django_model._meta.model_name}", + ) + return False + + def has_permission(self, request: Request, view): + # First restrict the mapping got GET/POST only + expected_request_method_permission_map = {k: v for k, v in self.request_method_permission_map.items() if k in {"GET", "POST"}} + # Evaluate the permissions + return self._evaluate_permissions(request, expected_request_method_permission_map) + + def has_object_permission(self, request: Request, view, obj): + return self._evaluate_permissions(request, self.request_method_permission_map) + + class UserHasAppAnalysisPermission(permissions.BasePermission): def has_permission(self, request, view): return check_post_permission( @@ -279,132 +347,82 @@ def has_object_permission(self, request, view, obj): class UserHasEngagementPermission(permissions.BasePermission): - # Permission checks for related objects (like notes or metadata) can be moved - # into a seperate class, when the legacy authorization will be removed. - path_engagement_post = re.compile(r"^/api/v2/engagements/$") - path_engagement = re.compile(r"^/api/v2/engagements/\d+/$") - def has_permission(self, request, view): - if UserHasEngagementPermission.path_engagement_post.match( - request.path, - ) or UserHasEngagementPermission.path_engagement.match(request.path): - return check_post_permission( + return check_post_permission( request, Product, "product", Permissions.Engagement_Add, ) - # related object only need object permission - return True def has_object_permission(self, request, view, obj): - if UserHasEngagementPermission.path_engagement_post.match( - request.path, - ) or UserHasEngagementPermission.path_engagement.match(request.path): - return check_object_permission( - request, - obj, - Permissions.Engagement_View, - Permissions.Engagement_Edit, - Permissions.Engagement_Delete, - ) return check_object_permission( request, obj, Permissions.Engagement_View, Permissions.Engagement_Edit, - Permissions.Engagement_Edit, - Permissions.Engagement_Edit, + Permissions.Engagement_Delete, ) -class UserHasRiskAcceptancePermission(permissions.BasePermission): - # Permission checks for related objects (like notes or metadata) can be moved - # into a seperate class, when the legacy authorization will be removed. - path_risk_acceptance_post = re.compile(r"^/api/v2/risk_acceptances/$") - path_risk_acceptance = re.compile(r"^/api/v2/risk_acceptances/\d+/$") +class UserHasEngagementRelatedObjectPermission(BaseRelatedObjectPermission): + permission_map = { + "get_permission": Permissions.Engagement_View, + "put_permission": Permissions.Engagement_Edit, + "delete_permission": Permissions.Engagement_Edit, + "post_permission": Permissions.Engagement_Edit, + } + +class UserHasRiskAcceptancePermission(permissions.BasePermission): def has_permission(self, request, view): - if UserHasRiskAcceptancePermission.path_risk_acceptance_post.match( - request.path, - ) or UserHasRiskAcceptancePermission.path_risk_acceptance.match( - request.path, - ): - return check_post_permission( - request, Product, "product", Permissions.Risk_Acceptance, - ) - # related object only need object permission + # The previous implementation only checked for the object permission if the path was + # /api/v2/risk_acceptances/, but the path has always been /api/v2/risk_acceptance/ (notice the missing "s") + # So there really has not been a notion of a post permission check for risk acceptances. + # It would be best to leave as is to not break any existing implementations. return True def has_object_permission(self, request, view, obj): - if UserHasRiskAcceptancePermission.path_risk_acceptance_post.match( - request.path, - ) or UserHasRiskAcceptancePermission.path_risk_acceptance.match( - request.path, - ): - return check_object_permission( - request, - obj, - Permissions.Risk_Acceptance, - Permissions.Risk_Acceptance, - Permissions.Risk_Acceptance, - ) return check_object_permission( request, obj, Permissions.Risk_Acceptance, Permissions.Risk_Acceptance, Permissions.Risk_Acceptance, - Permissions.Risk_Acceptance, ) -class UserHasFindingPermission(permissions.BasePermission): - # Permission checks for related objects (like notes or metadata) can be moved - # into a seperate class, when the legacy authorization will be removed. - path_finding_post = re.compile(r"^/api/v2/findings/$") - path_finding = re.compile(r"^/api/v2/findings/\d+/$") - path_stub_finding_post = re.compile(r"^/api/v2/stub_findings/$") - path_stub_finding = re.compile(r"^/api/v2/stub_findings/\d+/$") +class UserHasRiskAcceptanceRelatedObjectPermission(BaseRelatedObjectPermission): + permission_map = { + "get_permission": Permissions.Risk_Acceptance, + "put_permission": Permissions.Risk_Acceptance, + "delete_permission": Permissions.Risk_Acceptance, + "post_permission": Permissions.Risk_Acceptance, + } + +class UserHasFindingPermission(permissions.BasePermission): def has_permission(self, request, view): - if ( - UserHasFindingPermission.path_finding_post.match(request.path) - or UserHasFindingPermission.path_finding.match(request.path) - or UserHasFindingPermission.path_stub_finding_post.match( - request.path, - ) - or UserHasFindingPermission.path_stub_finding.match(request.path) - ): - return check_post_permission( - request, Test, "test", Permissions.Finding_Add, - ) - # related object only need object permission - return True + return check_post_permission( + request, Test, "test", Permissions.Finding_Add, + ) def has_object_permission(self, request, view, obj): - if ( - UserHasFindingPermission.path_finding_post.match(request.path) - or UserHasFindingPermission.path_finding.match(request.path) - or UserHasFindingPermission.path_stub_finding_post.match( - request.path, - ) - or UserHasFindingPermission.path_stub_finding.match(request.path) - ): - return check_object_permission( - request, - obj, - Permissions.Finding_View, - Permissions.Finding_Edit, - Permissions.Finding_Delete, - ) return check_object_permission( request, obj, Permissions.Finding_View, Permissions.Finding_Edit, - Permissions.Finding_Edit, - Permissions.Finding_Edit, + Permissions.Finding_Delete, ) +class UserHasFindingRelatedObjectPermission(BaseRelatedObjectPermission): + permission_map = { + "get_permission": Permissions.Finding_View, + "put_permission": Permissions.Finding_Edit, + "delete_permission": Permissions.Finding_Edit, + "post_permission": Permissions.Finding_Edit, + } + + class UserHasImportPermission(permissions.BasePermission): def has_permission(self, request, view): # permission check takes place before validation, so we don't have access to serializer.validated_data() @@ -761,42 +779,30 @@ def has_permission(self, request, view): class UserHasTestPermission(permissions.BasePermission): - # Permission checks for related objects (like notes or metadata) can be moved - # into a seperate class, when the legacy authorization will be removed. - path_tests_post = re.compile(r"^/api/v2/tests/$") - path_tests = re.compile(r"^/api/v2/tests/\d+/$") - def has_permission(self, request, view): - if UserHasTestPermission.path_tests_post.match( - request.path, - ) or UserHasTestPermission.path_tests.match(request.path): - return check_post_permission( - request, Engagement, "engagement", Permissions.Test_Add, - ) - # related object only need object permission - return True + return check_post_permission( + request, Engagement, "engagement", Permissions.Test_Add, + ) def has_object_permission(self, request, view, obj): - if UserHasTestPermission.path_tests_post.match( - request.path, - ) or UserHasTestPermission.path_tests.match(request.path): - return check_object_permission( - request, - obj, - Permissions.Test_View, - Permissions.Test_Edit, - Permissions.Test_Delete, - ) return check_object_permission( request, obj, Permissions.Test_View, Permissions.Test_Edit, - Permissions.Test_Edit, - Permissions.Test_Edit, + Permissions.Test_Delete, ) +class UserHasTestRelatedObjectPermission(BaseRelatedObjectPermission): + permission_map = { + "get_permission": Permissions.Test_View, + "put_permission": Permissions.Test_Edit, + "delete_permission": Permissions.Test_Edit, + "post_permission": Permissions.Test_Edit, + } + + class UserHasTestImportPermission(permissions.BasePermission): def has_permission(self, request, view): return check_post_permission( @@ -1023,6 +1029,36 @@ def has_object_permission(self, request, view, obj): ) +class UserHasSLAPermission(BaseDjangoModelPermission): + django_model = SLA_Configuration + + +class UserHasDevelopmentEnvironmentPermission(BaseDjangoModelPermission): + django_model = Development_Environment + # https://github.com/DefectDojo/django-DefectDojo/blob/963d4a35bfd8f5138330f0d70595a755fa4999b0/dojo/user/utils.py#L93 + # It looks like view permission was explicitly not supported, so I assume + # reading these endpoints are not necessarily restricted (unless you're auth'd of course) + request_method_permission_map = { + "POST": "add", + "PUT": "change", + "PATCH": "change", + "DELETE": "delete", + } + + +class UserHasRegulationPermission(BaseDjangoModelPermission): + django_model = Regulation + # https://github.com/DefectDojo/django-DefectDojo/blob/963d4a35bfd8f5138330f0d70595a755fa4999b0/dojo/user/utils.py#L104 + # It looks like view permission was explicitly not supported, so I assume + # reading these endpoints are not necessarily restricted (unless you're auth'd of course) + request_method_permission_map = { + "POST": "add", + "PUT": "change", + "PATCH": "change", + "DELETE": "delete", + } + + def raise_no_auto_create_import_validation_error( test_title, scan_type, diff --git a/dojo/api_v2/views.py b/dojo/api_v2/views.py index 9c27e1f7820..a083535ef42 100644 --- a/dojo/api_v2/views.py +++ b/dojo/api_v2/views.py @@ -503,7 +503,7 @@ def generate_report(self, request, pk=None): request=serializers.AddNewNoteOptionSerializer, responses={status.HTTP_201_CREATED: serializers.NoteSerializer}, ) - @action(detail=True, methods=["get", "post"]) + @action(detail=True, methods=["get", "post"], permission_classes=[IsAuthenticated, permissions.UserHasEngagementRelatedObjectPermission]) def notes(self, request, pk=None): engagement = self.get_object() if request.method == "POST": @@ -567,7 +567,7 @@ def notes(self, request, pk=None): responses={status.HTTP_201_CREATED: serializers.FileSerializer}, ) @action( - detail=True, methods=["get", "post"], parser_classes=(MultiPartParser,), + detail=True, methods=["get", "post"], parser_classes=(MultiPartParser,), permission_classes=[IsAuthenticated, permissions.UserHasEngagementRelatedObjectPermission], ) def files(self, request, pk=None): engagement = self.get_object() @@ -603,7 +603,7 @@ def files(self, request, pk=None): status.HTTP_201_CREATED: serializers.EngagementCheckListSerializer, }, ) - @action(detail=True, methods=["get", "post"]) + @action(detail=True, methods=["get", "post"], permission_classes=[IsAuthenticated, permissions.UserHasEngagementRelatedObjectPermission]) def complete_checklist(self, request, pk=None): engagement = self.get_object() check_lists = Check_List.objects.filter(engagement=engagement) @@ -650,6 +650,7 @@ def complete_checklist(self, request, pk=None): detail=True, methods=["get"], url_path=r"files/download/(?P\d+)", + permission_classes=[IsAuthenticated, permissions.UserHasEngagementRelatedObjectPermission], ) def download_file(self, request, file_id, pk=None): engagement = self.get_object() @@ -735,7 +736,7 @@ def get_queryset(self): status.HTTP_200_OK: serializers.RiskAcceptanceProofSerializer, }, ) - @action(detail=True, methods=["get"]) + @action(detail=True, methods=["get"], permission_classes=(IsAuthenticated, permissions.UserHasRiskAcceptanceRelatedObjectPermission)) def download_proof(self, request, pk=None): risk_acceptance = self.get_object() # Get the file object @@ -937,7 +938,7 @@ def get_serializer_class(self): request=serializers.FindingCloseSerializer, responses={status.HTTP_200_OK: serializers.FindingCloseSerializer}, ) - @action(detail=True, methods=["post"]) + @action(detail=True, methods=["post"], permission_classes=(IsAuthenticated, permissions.UserHasFindingRelatedObjectPermission)) def close(self, request, pk=None): finding = self.get_object() @@ -978,7 +979,7 @@ def close(self, request, pk=None): request=serializers.TagSerializer, responses={status.HTTP_201_CREATED: serializers.TagSerializer}, ) - @action(detail=True, methods=["get", "post"]) + @action(detail=True, methods=["get", "post"], permission_classes=(IsAuthenticated, permissions.UserHasFindingRelatedObjectPermission)) def tags(self, request, pk=None): finding = self.get_object() @@ -1019,7 +1020,7 @@ def tags(self, request, pk=None): status.HTTP_201_CREATED: serializers.BurpRawRequestResponseSerializer, }, ) - @action(detail=True, methods=["get", "post"]) + @action(detail=True, methods=["get", "post"], permission_classes=(IsAuthenticated, permissions.UserHasFindingRelatedObjectPermission)) def request_response(self, request, pk=None): finding = self.get_object() @@ -1069,7 +1070,7 @@ def request_response(self, request, pk=None): request=serializers.AddNewNoteOptionSerializer, responses={status.HTTP_201_CREATED: serializers.NoteSerializer}, ) - @action(detail=True, methods=["get", "post"]) + @action(detail=True, methods=["get", "post"], permission_classes=(IsAuthenticated, permissions.UserHasFindingRelatedObjectPermission)) def notes(self, request, pk=None): finding = self.get_object() if request.method == "POST": @@ -1137,7 +1138,7 @@ def notes(self, request, pk=None): responses={status.HTTP_201_CREATED: serializers.FileSerializer}, ) @action( - detail=True, methods=["get", "post"], parser_classes=(MultiPartParser,), + detail=True, methods=["get", "post"], parser_classes=(MultiPartParser,), permission_classes=(IsAuthenticated, permissions.UserHasFindingRelatedObjectPermission), ) def files(self, request, pk=None): finding = self.get_object() @@ -1175,7 +1176,7 @@ def files(self, request, pk=None): @action( detail=True, methods=["get"], - url_path=r"files/download/(?P\d+)", + url_path=r"files/download/(?P\d+)", permission_classes=(IsAuthenticated, permissions.UserHasFindingRelatedObjectPermission), ) def download_file(self, request, file_id, pk=None): finding = self.get_object() @@ -1196,7 +1197,7 @@ def download_file(self, request, file_id, pk=None): request=serializers.FindingNoteSerializer, responses={status.HTTP_204_NO_CONTENT: ""}, ) - @action(detail=True, methods=["patch"]) + @action(detail=True, methods=["patch"], permission_classes=(IsAuthenticated, permissions.UserHasFindingRelatedObjectPermission)) def remove_note(self, request, pk=None): """Remove Note From Finding Note""" finding = self.get_object() @@ -1235,7 +1236,7 @@ def remove_note(self, request, pk=None): request=serializers.TagSerializer, responses={status.HTTP_204_NO_CONTENT: ""}, ) - @action(detail=True, methods=["put", "patch"]) + @action(detail=True, methods=["put", "patch"], permission_classes=(IsAuthenticated, permissions.UserHasFindingRelatedObjectPermission)) def remove_tags(self, request, pk=None): """Remove Tag(s) from finding list of tags""" finding = self.get_object() @@ -1285,6 +1286,7 @@ def remove_tags(self, request, pk=None): url_path=r"duplicate", filter_backends=[], pagination_class=None, + permission_classes=(IsAuthenticated, permissions.UserHasFindingRelatedObjectPermission), ) def get_duplicate_cluster(self, request, pk): finding = self.get_object() @@ -1298,7 +1300,7 @@ def get_duplicate_cluster(self, request, pk): request=OpenApiTypes.NONE, responses={status.HTTP_204_NO_CONTENT: ""}, ) - @action(detail=True, methods=["post"], url_path=r"duplicate/reset") + @action(detail=True, methods=["post"], url_path=r"duplicate/reset", permission_classes=(IsAuthenticated, permissions.UserHasFindingRelatedObjectPermission)) def reset_finding_duplicate_status(self, request, pk): checked_duplicate_id = reset_finding_duplicate_status_internal( request.user, pk, @@ -1317,7 +1319,7 @@ def reset_finding_duplicate_status(self, request, pk): responses={status.HTTP_204_NO_CONTENT: ""}, ) @action( - detail=True, methods=["post"], url_path=r"original/(?P\d+)", + detail=True, methods=["post"], url_path=r"original/(?P\d+)", permission_classes=(IsAuthenticated, permissions.UserHasFindingRelatedObjectPermission), ) def set_finding_as_original(self, request, pk, new_fid): success = set_finding_as_original_internal(request.user, pk, new_fid) @@ -1493,6 +1495,7 @@ def _remove_metadata(self, request, finding): methods=["post", "put", "delete", "get"], filter_backends=[], pagination_class=None, + permission_classes=(IsAuthenticated, permissions.UserHasFindingRelatedObjectPermission), ) def metadata(self, request, pk=None): finding = self.get_object() @@ -2036,7 +2039,7 @@ class DevelopmentEnvironmentViewSet( serializer_class = serializers.DevelopmentEnvironmentSerializer queryset = Development_Environment.objects.none() filter_backends = (DjangoFilterBackend,) - permission_classes = (IsAuthenticated, DjangoModelPermissions) + permission_classes = (IsAuthenticated, permissions.UserHasDevelopmentEnvironmentPermission) def get_queryset(self): return Development_Environment.objects.all().order_by("id") @@ -2128,7 +2131,7 @@ def generate_report(self, request, pk=None): request=serializers.AddNewNoteOptionSerializer, responses={status.HTTP_201_CREATED: serializers.NoteSerializer}, ) - @action(detail=True, methods=["get", "post"]) + @action(detail=True, methods=["get", "post"], permission_classes=(IsAuthenticated, permissions.UserHasTestRelatedObjectPermission)) def notes(self, request, pk=None): test = self.get_object() if request.method == "POST": @@ -2190,7 +2193,7 @@ def notes(self, request, pk=None): responses={status.HTTP_201_CREATED: serializers.FileSerializer}, ) @action( - detail=True, methods=["get", "post"], parser_classes=(MultiPartParser,), + detail=True, methods=["get", "post"], parser_classes=(MultiPartParser,), permission_classes=(IsAuthenticated, permissions.UserHasTestRelatedObjectPermission), ) def files(self, request, pk=None): test = self.get_object() @@ -2229,6 +2232,7 @@ def files(self, request, pk=None): detail=True, methods=["get"], url_path=r"files/download/(?P\d+)", + permission_classes=(IsAuthenticated, permissions.UserHasTestRelatedObjectPermission), ) def download_file(self, request, file_id, pk=None): test = self.get_object() @@ -2388,7 +2392,7 @@ class RegulationsViewSet( queryset = Regulation.objects.none() filter_backends = (DjangoFilterBackend,) filterset_fields = ["id", "name", "description"] - permission_classes = (IsAuthenticated, DjangoModelPermissions) + permission_classes = (IsAuthenticated, permissions.UserHasRegulationPermission) def get_queryset(self): return Regulation.objects.all().order_by("id") @@ -2742,7 +2746,7 @@ class BurpRawRequestResponseViewSet( filterset_fields = ["finding"] permission_classes = ( IsAuthenticated, - permissions.UserHasFindingPermission, + permissions.UserHasFindingRelatedObjectPermission, ) def get_queryset(self): @@ -3129,7 +3133,7 @@ class SLAConfigurationViewset( serializer_class = serializers.SLAConfigurationSerializer queryset = SLA_Configuration.objects.none() filter_backends = (DjangoFilterBackend,) - permission_classes = (IsAuthenticated, DjangoModelPermissions) + permission_classes = (IsAuthenticated, permissions.UserHasSLAPermission) def get_queryset(self): return SLA_Configuration.objects.all().order_by("id") @@ -3143,7 +3147,7 @@ class QuestionnaireQuestionViewSet( queryset = Question.objects.none() filter_backends = (DjangoFilterBackend,) permission_classes = ( - permissions.UserHasEngagementPermission, + permissions.UserHasEngagementRelatedObjectPermission, DjangoModelPermissions, ) @@ -3159,7 +3163,7 @@ class QuestionnaireAnswerViewSet( queryset = Answer.objects.none() filter_backends = (DjangoFilterBackend,) permission_classes = ( - permissions.UserHasEngagementPermission, + permissions.UserHasEngagementRelatedObjectPermission, DjangoModelPermissions, ) @@ -3174,7 +3178,7 @@ class QuestionnaireGeneralSurveyViewSet( queryset = General_Survey.objects.none() filter_backends = (DjangoFilterBackend,) permission_classes = ( - permissions.UserHasEngagementPermission, + permissions.UserHasEngagementRelatedObjectPermission, DjangoModelPermissions, ) @@ -3189,7 +3193,7 @@ class QuestionnaireEngagementSurveyViewSet( queryset = Engagement_Survey.objects.none() filter_backends = (DjangoFilterBackend,) permission_classes = ( - permissions.UserHasEngagementPermission, + permissions.UserHasEngagementRelatedObjectPermission, DjangoModelPermissions, ) @@ -3230,7 +3234,7 @@ class QuestionnaireAnsweredSurveyViewSet( queryset = Answered_Survey.objects.none() filter_backends = (DjangoFilterBackend,) permission_classes = ( - permissions.UserHasEngagementPermission, + permissions.UserHasEngagementRelatedObjectPermission, DjangoModelPermissions, ) diff --git a/dojo/fixtures/dojo_testdata.json b/dojo/fixtures/dojo_testdata.json index 26148621eaf..9575d2aba3a 100644 --- a/dojo/fixtures/dojo_testdata.json +++ b/dojo/fixtures/dojo_testdata.json @@ -112,6 +112,24 @@ "date_joined": "2018-04-13T07:59:51.527Z" } }, + { + "pk": 7, + "model": "auth.user", + "fields": { + "username": "globalWriter", + "first_name": "Global", + "last_name": "Writer", + "is_active": true, + "is_superuser": false, + "is_staff": false, + "last_login": null, + "groups": [], + "user_permissions": [], + "password": "pbkdf2_sha256$36000$pe8Ff8HrBPac$Lb3ee6/R9z/aL9nM+D2AXWTpIt9Pa9kcLueXxYNy1ZY=", + "email": "global_writer@email.com", + "date_joined": "2018-04-13T07:59:51.527Z" + } + }, { "pk": "2dqr18yqu9mzb87abk0okid75w2clakl", "model": "sessions.session", @@ -2804,6 +2822,14 @@ "created": "2018-04-16T06:54:35.933Z" } }, + { + "pk": "184770c4c3256aba904297610fbb4da3fa15ba37", + "model": "authtoken.token", + "fields": { + "user": 7, + "created": "2018-04-16T06:54:35.933Z" + } + }, { "pk": "1", "model": "dojo.dojo_group", @@ -2871,6 +2897,14 @@ "role": 5 } }, + { + "pk": 4, + "model": "dojo.global_role", + "fields": { + "user": 7, + "role": 2 + } + }, { "model": "dojo.language_type", "pk": 1, diff --git a/unittests/test_notifications.py b/unittests/test_notifications.py index 7c5b289a211..151d22e5d58 100644 --- a/unittests/test_notifications.py +++ b/unittests/test_notifications.py @@ -215,7 +215,7 @@ def test_product_types(self, mock): with self.subTest("product_type_added"): with set_actor(self.notification_tester), pghistory.context(user=self.notification_tester.id): prod_type = Product_Type.objects.create(name="notif prod type") - self.assertEqual(mock.call_count, last_count + 4) + self.assertEqual(mock.call_count, last_count + 5) self.assertEqual(mock.call_args_list[-1].args[0], "product_type_added") self.assertEqual(mock.call_args_list[-1].kwargs["url"], f"/product/type/{prod_type.id}") @@ -236,7 +236,7 @@ def test_products(self, mock): with set_actor(self.notification_tester), pghistory.context(user=self.notification_tester.id): prod_type = Product_Type.objects.first() prod, _ = Product.objects.get_or_create(prod_type=prod_type, name="prod name") - self.assertEqual(mock.call_count, last_count + 5) + self.assertEqual(mock.call_count, last_count + 6) self.assertEqual(mock.call_args_list[-1].args[0], "product_added") self.assertEqual(mock.call_args_list[-1].kwargs["url"], f"/product/{prod.id}") @@ -257,7 +257,7 @@ def test_engagements(self, mock): with set_actor(self.notification_tester), pghistory.context(user=self.notification_tester.id): prod = Product.objects.first() eng = Engagement.objects.create(product=prod, target_start=timezone.now(), target_end=timezone.now()) - self.assertEqual(mock.call_count, last_count + 5) + self.assertEqual(mock.call_count, last_count + 6) self.assertEqual(mock.call_args_list[-1].args[0], "engagement_added") self.assertEqual(mock.call_args_list[-1].kwargs["url"], f"/engagement/{eng.id}") @@ -266,7 +266,7 @@ def test_engagements(self, mock): with set_actor(self.notification_tester), pghistory.context(user=self.notification_tester.id): eng.status = "Completed" eng.save() - self.assertEqual(mock.call_count, last_count + 5) + self.assertEqual(mock.call_count, last_count + 6) self.assertEqual(mock.call_args_list[-1].args[0], "engagement_closed") self.assertEqual(mock.call_args_list[-1].kwargs["url"], f"/engagement/{eng.id}/finding/all") @@ -275,7 +275,7 @@ def test_engagements(self, mock): with set_actor(self.notification_tester), pghistory.context(user=self.notification_tester.id): eng.status = "In Progress" eng.save() - self.assertEqual(mock.call_count, last_count + 5) + self.assertEqual(mock.call_count, last_count + 6) self.assertEqual(mock.call_args_list[-1].args[0], "engagement_reopened") self.assertEqual(mock.call_args_list[-1].kwargs["url"], f"/engagement/{eng.id}") @@ -376,7 +376,7 @@ def test_finding_groups(self, mock): with self.subTest("test_deleted itself"): with set_actor(self.notification_tester), pghistory.context(user=self.notification_tester.id): fg2.delete() - self.assertEqual(mock.call_count, last_count + 5) + self.assertEqual(mock.call_count, last_count + 6) self.assertEqual(mock.call_args_list[-1].args[0], "finding_group_deleted") self.assertEqual(mock.call_args_list[-1].kwargs["description"], 'The finding group "fg test" was deleted by admin') self.assertEqual(mock.call_args_list[-1].kwargs["url"], f"/test/{test2.id}") diff --git a/unittests/test_rest_framework.py b/unittests/test_rest_framework.py index 7f2001dd193..7e9af36d82e 100644 --- a/unittests/test_rest_framework.py +++ b/unittests/test_rest_framework.py @@ -14,6 +14,7 @@ from django.conf import settings from django.contrib.auth.models import Permission +from django.core.files.uploadedfile import SimpleUploadedFile from django.test import tag as test_tag from django.test.utils import override_settings from django.urls import reverse @@ -21,6 +22,7 @@ from drf_spectacular.drainage import GENERATOR_STATS from drf_spectacular.settings import spectacular_settings from drf_spectacular.validation import validate_schema +from parameterized import parameterized from rest_framework import status from rest_framework.authtoken.models import Token from rest_framework.mixins import ( @@ -368,34 +370,36 @@ class TestType(Enum): class BaseClass: class RESTEndpointTest(DojoAPITestCase): + NOT_AUTHORIZED_USER_ID = 3 + GLOBAL_READER_USER_ID = 5 + GLOBAL_WRITER_USER_ID = 7 + GLOBAL_OWNER_USER_ID = 6 + def __init__(self, *args, **kwargs): DojoAPITestCase.__init__(self, *args, **kwargs) - def setUp(self): - testuser = User.objects.get(username="admin") + def _get_client(self, user_criteria: dict) -> None: + testuser = User.objects.get(**user_criteria) token = Token.objects.get(user=testuser) self.client = APIClient() self.client.credentials(HTTP_AUTHORIZATION="Token " + token.key) + + def setUp(self): + self._get_client({"username": "admin"}) self.url = reverse(self.viewname + "-list") self.schema = get_open_api3_json_schema() def setUp_not_authorized(self): - testuser = User.objects.get(id=3) - token = Token.objects.get(user=testuser) - self.client = APIClient() - self.client.credentials(HTTP_AUTHORIZATION="Token " + token.key) + self._get_client({"id": self.NOT_AUTHORIZED_USER_ID}) def setUp_global_reader(self): - testuser = User.objects.get(id=5) - token = Token.objects.get(user=testuser) - self.client = APIClient() - self.client.credentials(HTTP_AUTHORIZATION="Token " + token.key) + self._get_client({"id": self.GLOBAL_READER_USER_ID}) + + def setUp_global_writer(self): + self._get_client({"id": self.GLOBAL_WRITER_USER_ID}) def setUp_global_owner(self): - testuser = User.objects.get(id=6) - token = Token.objects.get(user=testuser) - self.client = APIClient() - self.client.credentials(HTTP_AUTHORIZATION="Token " + token.key) + self._get_client({"id": self.GLOBAL_OWNER_USER_ID}) def check_schema(self, schema, obj): schema_checker = SchemaChecker(self.schema["components"]) @@ -1179,6 +1183,42 @@ def __init__(self, *args, **kwargs): self.deleted_objects = 23 BaseClass.RESTEndpointTest.__init__(self, *args, **kwargs) + @parameterized.expand( + [ + ("files", {"title": "test", "file": b"empty"}), + ("notes", {"entry": "string"}), + ], + ) + def test_related_objects(self, related_object_path, payload): + """ + Tests that BaseRelatedObjectPermission enforces the permissions not associated + with the base object. For example, even though a request to add a note to an + engagement is a POST, we do not need engagement add permissions, but rather + engagement edit permissions since that is what is defined in the + UserHasEngagementRelatedObjectPermission class + """ + self.setUp_global_reader() + # Get an engagement + response = self.client.get(self.url, format="json") + self.assertEqual(200, response.status_code, response.content[:1000]) + engagement_id = response.data["results"][0]["id"] + # Attempt to add a related object + relative_url = f"{self.url}{engagement_id}/{related_object_path}/" + response = self.client.post(relative_url, payload) + self.assertEqual(403, response.status_code, response.content[:1000]) + # Now switch to a user with edit permissions (but not create) + self.setUp_global_writer() + # Retry adding the related object + if related_object_path == "files": + # Convert bytes to a mock uploaded file + payload["file"] = SimpleUploadedFile( + name="test_file.txt", + content=payload["file"], # the b"empty" + content_type="text/plain", + ) + response = self.client.post(relative_url, payload) + self.assertEqual(201, response.status_code, response.content[:1000]) + class RiskAcceptanceTest(BaseClass.BaseClassTest): fixtures = ["dojo_testdata.json"] @@ -3440,6 +3480,66 @@ def test_delete(self): response = self.client.delete(relative_url) self.assertEqual(409, response.status_code, response.content[:1000]) + def test_list_method_requires_no_authorization(self): + """ + Tests the use case of not supplying GET permissions for the BaseDjangoModelPermission + class used in the UserHasDevelopmentEnvironmentPermission class. + """ + self.setUp_not_authorized() + response = self.client.get(self.url, format="json") + self.assertEqual(200, response.status_code, response.content[:1000]) + + @parameterized.expand( + [ + ( + "add_development_environment", + "post", + 201, + { + "name": "Test_1", + }, + ), + ( + "change_development_environment", + "put", + 200, + {"name": "Test_2"}, + ), + ( + "change_development_environment", + "put", + 200, + {"name": "Test_3"}, + ), + ( + "delete_development_environment", + "delete", + 409, # Deletion is blocked because of existing references, but it is better than 403 for this test + None, + ), + ], + ) + def test_user_needs_configuration_permission(self, codename, method, expected_status, payload): + """ + Tests that BaseDjangoModelPermission enforces the django configuration permissions + through the class used in the UserHasDevelopmentEnvironmentPermission class. + """ + # Ensure we get a 403 first + self.setUp_not_authorized() + response = self.client.post(self.url, payload, format="json") + self.assertEqual(403, response.status_code, response.content[:1000]) + # Now Get the same user as self.client is using, add the permission, and try again + testuser = User.objects.get(id=self.NOT_AUTHORIZED_USER_ID) + permission = Permission.objects.get(codename=codename) + testuser.user_permissions.add(permission) + if method in {"put", "patch", "delete"}: + current_objects = self.client.get(self.url, format="json").data + relative_url = self.url + "{}/".format(current_objects["results"][-1]["id"]) + else: + relative_url = self.url + response = getattr(self.client, method)(relative_url, payload, format="json") + self.assertEqual(expected_status, response.status_code, response.content[:1000]) + class TestTypeTest(BaseClass.AuthenticatedViewTest): fixtures = ["dojo_testdata.json"] @@ -3635,7 +3735,7 @@ def __init__(self, *args, **kwargs): } self.update_fields = {"style": "warning"} self.test_type = TestType.CONFIGURATION_PERMISSIONS - self.deleted_objects = 7 + self.deleted_objects = 8 BaseClass.RESTEndpointTest.__init__(self, *args, **kwargs) def test_create(self):