diff --git a/docs/content/en/open_source/upgrading/2.55.md b/docs/content/en/open_source/upgrading/2.55.md index c87be940b5a..90df965950d 100644 --- a/docs/content/en/open_source/upgrading/2.55.md +++ b/docs/content/en/open_source/upgrading/2.55.md @@ -2,6 +2,12 @@ title: 'Upgrading to DefectDojo Version 2.55.x' toc_hide: true weight: -20260105 -description: No special instructions. +description: Authorization related optimizations --- -There are no special instructions for upgrading to 2.55.x. Check the [Release Notes](https://github.com/DefectDojo/django-DefectDojo/releases/tag/2.55.0) for the contents of the release. + +## Authorization related optimizations + +The queries related to authorizations have been optmized. For example retrieving the list of authorized findings for the logged in user. +Some of these are now also cached during that duration of a request. This should have no functional effects and only results in better performance. + +Check the [Release Notes](https://github.com/DefectDojo/django-DefectDojo/releases/tag/2.55.0) for the contents of the release. diff --git a/dojo/__init__.py b/dojo/__init__.py index fe74ae3fec3..31d0198e16f 100644 --- a/dojo/__init__.py +++ b/dojo/__init__.py @@ -5,5 +5,5 @@ from .celery import app as celery_app # noqa: F401 __version__ = "2.55.0-dev" -__url__ = "https://github.com/DefectDojo/django-DefectDojo" -__docs__ = "https://documentation.defectdojo.com" +__url__ = "https://github.com/DefectDojo/django-DefectDojo" # noqa: RUF067 +__docs__ = "https://documentation.defectdojo.com" # noqa: RUF067 diff --git a/dojo/cred/queries.py b/dojo/cred/queries.py index beb84129bab..a9dab0fb58e 100644 --- a/dojo/cred/queries.py +++ b/dojo/cred/queries.py @@ -1,17 +1,21 @@ from crum import get_current_user -from django.db.models import Exists, OuterRef, Q +from django.db.models import Q, Subquery from dojo.authorization.authorization import get_roles_for_permission, user_has_global_permission from dojo.models import Cred_Mapping, Product_Group, Product_Member, Product_Type_Group, Product_Type_Member +from dojo.request_cache import cache_for_request -def get_authorized_cred_mappings(permission, queryset=None): +# Cached: all parameters are hashable, no dynamic queryset filtering +@cache_for_request +def get_authorized_cred_mappings(permission): + """Cached - returns all cred mappings the user is authorized to see.""" user = get_current_user() if user is None: return Cred_Mapping.objects.none() - cred_mappings = Cred_Mapping.objects.all().order_by("id") if queryset is None else queryset + cred_mappings = Cred_Mapping.objects.all().order_by("id") if user.is_superuser: return cred_mappings @@ -20,27 +24,69 @@ def get_authorized_cred_mappings(permission, queryset=None): return cred_mappings roles = get_roles_for_permission(permission) + + # Get authorized product/product_type IDs via subqueries authorized_product_type_roles = Product_Type_Member.objects.filter( - product_type=OuterRef("product__prod_type_id"), - user=user, - role__in=roles) + user=user, role__in=roles, + ).values("product_type_id") + authorized_product_roles = Product_Member.objects.filter( - product=OuterRef("product_id"), - user=user, - role__in=roles) + user=user, role__in=roles, + ).values("product_id") + authorized_product_type_groups = Product_Type_Group.objects.filter( - product_type=OuterRef("product__prod_type_id"), - group__users=user, - role__in=roles) + group__users=user, role__in=roles, + ).values("product_type_id") + authorized_product_groups = Product_Group.objects.filter( - product=OuterRef("product_id"), - group__users=user, - role__in=roles) - cred_mappings = cred_mappings.annotate( - product__prod_type__member=Exists(authorized_product_type_roles), - product__member=Exists(authorized_product_roles), - product__prod_type__authorized_group=Exists(authorized_product_type_groups), - product__authorized_group=Exists(authorized_product_groups)) + group__users=user, role__in=roles, + ).values("product_id") + + # Filter using IN with Subquery - no annotations needed return cred_mappings.filter( - Q(product__prod_type__member=True) | Q(product__member=True) - | Q(product__prod_type__authorized_group=True) | Q(product__authorized_group=True)) + Q(product__prod_type_id__in=Subquery(authorized_product_type_roles)) + | Q(product_id__in=Subquery(authorized_product_roles)) + | Q(product__prod_type_id__in=Subquery(authorized_product_type_groups)) + | Q(product_id__in=Subquery(authorized_product_groups)), + ) + + +def get_authorized_cred_mappings_for_queryset(permission, queryset): + """Filters a provided queryset for authorization. Not cached due to dynamic queryset parameter.""" + user = get_current_user() + + if user is None: + return Cred_Mapping.objects.none() + + if user.is_superuser: + return queryset + + if user_has_global_permission(user, permission): + return queryset + + roles = get_roles_for_permission(permission) + + # Get authorized product/product_type IDs via subqueries + authorized_product_type_roles = Product_Type_Member.objects.filter( + user=user, role__in=roles, + ).values("product_type_id") + + authorized_product_roles = Product_Member.objects.filter( + user=user, role__in=roles, + ).values("product_id") + + authorized_product_type_groups = Product_Type_Group.objects.filter( + group__users=user, role__in=roles, + ).values("product_type_id") + + authorized_product_groups = Product_Group.objects.filter( + group__users=user, role__in=roles, + ).values("product_id") + + # Filter using IN with Subquery - no annotations needed + return queryset.filter( + Q(product__prod_type_id__in=Subquery(authorized_product_type_roles)) + | Q(product_id__in=Subquery(authorized_product_roles)) + | Q(product__prod_type_id__in=Subquery(authorized_product_type_groups)) + | Q(product_id__in=Subquery(authorized_product_groups)), + ) diff --git a/dojo/cred/views.py b/dojo/cred/views.py index be5d0560683..4feaddb73b6 100644 --- a/dojo/cred/views.py +++ b/dojo/cred/views.py @@ -8,7 +8,7 @@ from dojo.authorization.authorization_decorators import user_is_authorized, user_is_configuration_authorized from dojo.authorization.roles_permissions import Permissions -from dojo.cred.queries import get_authorized_cred_mappings +from dojo.cred.queries import get_authorized_cred_mappings_for_queryset from dojo.forms import CredMappingForm, CredMappingFormProd, CredUserForm, NoteForm from dojo.models import Cred_Mapping, Cred_User, Engagement, Finding, Product, Test from dojo.utils import Product_Tab, add_breadcrumb, dojo_crypto_encrypt, prepare_for_view @@ -85,7 +85,7 @@ def view_cred_details(request, ttid): notes = cred.notes.all() cred_products = Cred_Mapping.objects.select_related("product").filter( product_id__isnull=False, cred_id=ttid).order_by("product__name") - cred_products = get_authorized_cred_mappings(Permissions.Product_View, cred_products) + cred_products = get_authorized_cred_mappings_for_queryset(Permissions.Product_View, cred_products) if request.method == "POST": form = NoteForm(request.POST) diff --git a/dojo/endpoint/queries.py b/dojo/endpoint/queries.py index 4a6f2ae56cd..f8336b75f75 100644 --- a/dojo/endpoint/queries.py +++ b/dojo/endpoint/queries.py @@ -1,5 +1,5 @@ from crum import get_current_user -from django.db.models import Exists, OuterRef, Q +from django.db.models import Q, Subquery from dojo.authorization.authorization import get_roles_for_permission, user_has_global_permission from dojo.models import ( @@ -10,17 +10,20 @@ Product_Type_Group, Product_Type_Member, ) +from dojo.request_cache import cache_for_request -def get_authorized_endpoints(permission, queryset=None, user=None): - +# Cached: all parameters are hashable, no dynamic queryset filtering +@cache_for_request +def get_authorized_endpoints(permission, user=None): + """Cached - returns all endpoints the user is authorized to see.""" if user is None: user = get_current_user() if user is None: return Endpoint.objects.none() - endpoints = Endpoint.objects.all().order_by("id") if queryset is None else queryset + endpoints = Endpoint.objects.all().order_by("id") if user.is_superuser: return endpoints @@ -29,41 +32,86 @@ def get_authorized_endpoints(permission, queryset=None, user=None): return endpoints roles = get_roles_for_permission(permission) + + # Get authorized product/product_type IDs via subqueries authorized_product_type_roles = Product_Type_Member.objects.filter( - product_type=OuterRef("product__prod_type_id"), - user=user, - role__in=roles) + user=user, role__in=roles, + ).values("product_type_id") + authorized_product_roles = Product_Member.objects.filter( - product=OuterRef("product_id"), - user=user, - role__in=roles) + user=user, role__in=roles, + ).values("product_id") + authorized_product_type_groups = Product_Type_Group.objects.filter( - product_type=OuterRef("product__prod_type_id"), - group__users=user, - role__in=roles) + group__users=user, role__in=roles, + ).values("product_type_id") + authorized_product_groups = Product_Group.objects.filter( - product=OuterRef("product_id"), - group__users=user, - role__in=roles) - endpoints = endpoints.annotate( - product__prod_type__member=Exists(authorized_product_type_roles), - product__member=Exists(authorized_product_roles), - product__prod_type__authorized_group=Exists(authorized_product_type_groups), - product__authorized_group=Exists(authorized_product_groups)) + group__users=user, role__in=roles, + ).values("product_id") + + # Filter using IN with Subquery - no annotations needed return endpoints.filter( - Q(product__prod_type__member=True) | Q(product__member=True) - | Q(product__prod_type__authorized_group=True) | Q(product__authorized_group=True)) + Q(product__prod_type_id__in=Subquery(authorized_product_type_roles)) + | Q(product_id__in=Subquery(authorized_product_roles)) + | Q(product__prod_type_id__in=Subquery(authorized_product_type_groups)) + | Q(product_id__in=Subquery(authorized_product_groups)), + ) -def get_authorized_endpoint_status(permission, queryset=None, user=None): +def get_authorized_endpoints_for_queryset(permission, queryset, user=None): + """Filters a provided queryset for authorization. Not cached due to dynamic queryset parameter.""" + if user is None: + user = get_current_user() + if user is None: + return Endpoint.objects.none() + + if user.is_superuser: + return queryset + + if user_has_global_permission(user, permission): + return queryset + + roles = get_roles_for_permission(permission) + + # Get authorized product/product_type IDs via subqueries + authorized_product_type_roles = Product_Type_Member.objects.filter( + user=user, role__in=roles, + ).values("product_type_id") + + authorized_product_roles = Product_Member.objects.filter( + user=user, role__in=roles, + ).values("product_id") + + authorized_product_type_groups = Product_Type_Group.objects.filter( + group__users=user, role__in=roles, + ).values("product_type_id") + + authorized_product_groups = Product_Group.objects.filter( + group__users=user, role__in=roles, + ).values("product_id") + + # Filter using IN with Subquery - no annotations needed + return queryset.filter( + Q(product__prod_type_id__in=Subquery(authorized_product_type_roles)) + | Q(product_id__in=Subquery(authorized_product_roles)) + | Q(product__prod_type_id__in=Subquery(authorized_product_type_groups)) + | Q(product_id__in=Subquery(authorized_product_groups)), + ) + + +# Cached: all parameters are hashable, no dynamic queryset filtering +@cache_for_request +def get_authorized_endpoint_status(permission, user=None): + """Cached - returns all endpoint statuses the user is authorized to see.""" if user is None: user = get_current_user() if user is None: return Endpoint_Status.objects.none() - endpoint_status = Endpoint_Status.objects.all().order_by("id") if queryset is None else queryset + endpoint_status = Endpoint_Status.objects.all().order_by("id") if user.is_superuser: return endpoint_status @@ -72,27 +120,70 @@ def get_authorized_endpoint_status(permission, queryset=None, user=None): return endpoint_status roles = get_roles_for_permission(permission) + + # Get authorized product/product_type IDs via subqueries authorized_product_type_roles = Product_Type_Member.objects.filter( - product_type=OuterRef("endpoint__product__prod_type_id"), - user=user, - role__in=roles) + user=user, role__in=roles, + ).values("product_type_id") + authorized_product_roles = Product_Member.objects.filter( - product=OuterRef("endpoint__product_id"), - user=user, - role__in=roles) + user=user, role__in=roles, + ).values("product_id") + authorized_product_type_groups = Product_Type_Group.objects.filter( - product_type=OuterRef("endpoint__product__prod_type_id"), - group__users=user, - role__in=roles) + group__users=user, role__in=roles, + ).values("product_type_id") + authorized_product_groups = Product_Group.objects.filter( - product=OuterRef("endpoint__product_id"), - group__users=user, - role__in=roles) - endpoint_status = endpoint_status.annotate( - endpoint__product__prod_type__member=Exists(authorized_product_type_roles), - endpoint__product__member=Exists(authorized_product_roles), - endpoint__product__prod_type__authorized_group=Exists(authorized_product_type_groups), - endpoint__product__authorized_group=Exists(authorized_product_groups)) + group__users=user, role__in=roles, + ).values("product_id") + + # Filter using IN with Subquery - no annotations needed return endpoint_status.filter( - Q(endpoint__product__prod_type__member=True) | Q(endpoint__product__member=True) - | Q(endpoint__product__prod_type__authorized_group=True) | Q(endpoint__product__authorized_group=True)) + Q(endpoint__product__prod_type_id__in=Subquery(authorized_product_type_roles)) + | Q(endpoint__product_id__in=Subquery(authorized_product_roles)) + | Q(endpoint__product__prod_type_id__in=Subquery(authorized_product_type_groups)) + | Q(endpoint__product_id__in=Subquery(authorized_product_groups)), + ) + + +def get_authorized_endpoint_status_for_queryset(permission, queryset, user=None): + """Filters a provided queryset for authorization. Not cached due to dynamic queryset parameter.""" + if user is None: + user = get_current_user() + + if user is None: + return Endpoint_Status.objects.none() + + if user.is_superuser: + return queryset + + if user_has_global_permission(user, permission): + return queryset + + roles = get_roles_for_permission(permission) + + # Get authorized product/product_type IDs via subqueries + authorized_product_type_roles = Product_Type_Member.objects.filter( + user=user, role__in=roles, + ).values("product_type_id") + + authorized_product_roles = Product_Member.objects.filter( + user=user, role__in=roles, + ).values("product_id") + + authorized_product_type_groups = Product_Type_Group.objects.filter( + group__users=user, role__in=roles, + ).values("product_type_id") + + authorized_product_groups = Product_Group.objects.filter( + group__users=user, role__in=roles, + ).values("product_id") + + # Filter using IN with Subquery - no annotations needed + return queryset.filter( + Q(endpoint__product__prod_type_id__in=Subquery(authorized_product_type_roles)) + | Q(endpoint__product_id__in=Subquery(authorized_product_roles)) + | Q(endpoint__product__prod_type_id__in=Subquery(authorized_product_type_groups)) + | Q(endpoint__product_id__in=Subquery(authorized_product_groups)), + ) diff --git a/dojo/endpoint/views.py b/dojo/endpoint/views.py index 1dc4df898c6..24735d35f51 100644 --- a/dojo/endpoint/views.py +++ b/dojo/endpoint/views.py @@ -18,7 +18,7 @@ from dojo.authorization.authorization import user_has_permission_or_403 from dojo.authorization.authorization_decorators import user_is_authorized from dojo.authorization.roles_permissions import Permissions -from dojo.endpoint.queries import get_authorized_endpoints +from dojo.endpoint.queries import get_authorized_endpoints_for_queryset from dojo.endpoint.utils import clean_hosts_run, endpoint_meta_import from dojo.filters import EndpointFilter, EndpointFilterWithoutObjectLookups from dojo.forms import AddEndpointForm, DeleteEndpointForm, DojoMetaDataForm, EditEndpointForm, ImportEndpointMetaForm @@ -52,7 +52,7 @@ def process_endpoints_view(request, *, host_view=False, vulnerable=False): endpoints = Endpoint.objects.all() endpoints = endpoints.prefetch_related("product", "product__tags", "tags").distinct() - endpoints = get_authorized_endpoints(Permissions.Endpoint_View, endpoints, request.user) + endpoints = get_authorized_endpoints_for_queryset(Permissions.Endpoint_View, endpoints, request.user) filter_string_matching = get_system_setting("filter_string_matching", False) filter_class = EndpointFilterWithoutObjectLookups if filter_string_matching else EndpointFilter if host_view: @@ -365,7 +365,7 @@ def endpoint_bulk_update_all(request, pid=None): product = get_object_or_404(Product, id=pid) user_has_permission_or_403(request.user, product, Permissions.Endpoint_Delete) - endpoints = get_authorized_endpoints(Permissions.Endpoint_Delete, endpoints, request.user) + endpoints = get_authorized_endpoints_for_queryset(Permissions.Endpoint_Delete, endpoints, request.user) skipped_endpoint_count = total_endpoint_count - endpoints.count() deleted_endpoint_count = endpoints.count() @@ -389,7 +389,7 @@ def endpoint_bulk_update_all(request, pid=None): product = get_object_or_404(Product, id=pid) user_has_permission_or_403(request.user, product, Permissions.Finding_Edit) - endpoints = get_authorized_endpoints(Permissions.Endpoint_Edit, endpoints, request.user) + endpoints = get_authorized_endpoints_for_queryset(Permissions.Endpoint_Edit, endpoints, request.user) skipped_endpoint_count = total_endpoint_count - endpoints.count() updated_endpoint_count = endpoints.count() diff --git a/dojo/engagement/queries.py b/dojo/engagement/queries.py index 97eeb31bdfa..cd720eb4251 100644 --- a/dojo/engagement/queries.py +++ b/dojo/engagement/queries.py @@ -1,10 +1,13 @@ from crum import get_current_user -from django.db.models import Exists, OuterRef, Q +from django.db.models import Q, Subquery from dojo.authorization.authorization import get_roles_for_permission, user_has_global_permission from dojo.models import Engagement, Product_Group, Product_Member, Product_Type_Group, Product_Type_Member +from dojo.request_cache import cache_for_request +# Cached: all parameters are hashable, no dynamic queryset filtering +@cache_for_request def get_authorized_engagements(permission): user = get_current_user() @@ -18,27 +21,28 @@ def get_authorized_engagements(permission): return Engagement.objects.all().order_by("id") roles = get_roles_for_permission(permission) + + # Get authorized product/product_type IDs via subqueries authorized_product_type_roles = Product_Type_Member.objects.filter( - product_type=OuterRef("product__prod_type_id"), - user=user, - role__in=roles) + user=user, role__in=roles, + ).values("product_type_id") + authorized_product_roles = Product_Member.objects.filter( - product=OuterRef("product_id"), - user=user, - role__in=roles) + user=user, role__in=roles, + ).values("product_id") + authorized_product_type_groups = Product_Type_Group.objects.filter( - product_type=OuterRef("product__prod_type_id"), - group__users=user, - role__in=roles) + group__users=user, role__in=roles, + ).values("product_type_id") + authorized_product_groups = Product_Group.objects.filter( - product=OuterRef("product_id"), - group__users=user, - role__in=roles) - engagements = Engagement.objects.annotate( - product__prod_type__member=Exists(authorized_product_type_roles), - product__member=Exists(authorized_product_roles), - product__prod_type__authorized_group=Exists(authorized_product_type_groups), - product__authorized_group=Exists(authorized_product_groups)).order_by("id") - return engagements.filter( - Q(product__prod_type__member=True) | Q(product__member=True) - | Q(product__prod_type__authorized_group=True) | Q(product__authorized_group=True)) + group__users=user, role__in=roles, + ).values("product_id") + + # Filter using IN with Subquery - no annotations needed + return Engagement.objects.filter( + Q(product__prod_type_id__in=Subquery(authorized_product_type_roles)) + | Q(product_id__in=Subquery(authorized_product_roles)) + | Q(product__prod_type_id__in=Subquery(authorized_product_type_groups)) + | Q(product_id__in=Subquery(authorized_product_groups)), + ).order_by("id") diff --git a/dojo/filters.py b/dojo/filters.py index 4ae5224dab6..adb4d0d6825 100644 --- a/dojo/filters.py +++ b/dojo/filters.py @@ -38,7 +38,7 @@ # from tagulous.forms import TagWidget # import tagulous from dojo.authorization.roles_permissions import Permissions -from dojo.endpoint.queries import get_authorized_endpoints +from dojo.endpoint.queries import get_authorized_endpoints_for_queryset from dojo.engagement.queries import get_authorized_engagements from dojo.finding.helper import ( ACCEPTED_FINDINGS_QUERY, @@ -52,8 +52,8 @@ VERIFIED_FINDINGS_QUERY, WAS_ACCEPTED_FINDINGS_QUERY, ) -from dojo.finding.queries import get_authorized_findings -from dojo.finding_group.queries import get_authorized_finding_groups +from dojo.finding.queries import get_authorized_findings_for_queryset +from dojo.finding_group.queries import get_authorized_finding_groups_for_queryset from dojo.labels import get_labels from dojo.models import ( EFFORT_FOR_FIXING_CHOICES, @@ -2098,7 +2098,7 @@ def set_related_object_fields(self, *args: list, **kwargs: dict): if self.form.fields.get("test__engagement__product"): self.form.fields["test__engagement__product"].queryset = get_authorized_products(Permissions.Product_View) if self.form.fields.get("finding_group", None): - self.form.fields["finding_group"].queryset = get_authorized_finding_groups(Permissions.Finding_Group_View, queryset=finding_group_query) + self.form.fields["finding_group"].queryset = get_authorized_finding_groups_for_queryset(Permissions.Finding_Group_View, finding_group_query) self.form.fields["reporter"].queryset = get_authorized_users(Permissions.Finding_View) self.form.fields["reviewers"].queryset = self.form.fields["reporter"].queryset @@ -2205,7 +2205,7 @@ def set_hash_codes(self, *args: list, **kwargs: dict): def filter_queryset(self, *args: list, **kwargs: dict): queryset = super().filter_queryset(*args, **kwargs) - queryset = get_authorized_findings(Permissions.Finding_View, queryset, self.user) + queryset = get_authorized_findings_for_queryset(Permissions.Finding_View, queryset, self.user) return queryset.exclude(pk=self.finding.pk) @@ -2751,7 +2751,7 @@ def __init__(self, *args, **kwargs): @property def qs(self): parent = super().qs - return get_authorized_endpoints(Permissions.Endpoint_View, parent) + return get_authorized_endpoints_for_queryset(Permissions.Endpoint_View, parent) class Meta: model = Endpoint @@ -2892,7 +2892,7 @@ def __init__(self, *args, **kwargs): @property def qs(self): parent = super().qs - return get_authorized_endpoints(Permissions.Endpoint_View, parent) + return get_authorized_endpoints_for_queryset(Permissions.Endpoint_View, parent) class Meta: model = Endpoint @@ -3240,7 +3240,7 @@ def manage_kwargs(self, kwargs): @property def qs(self): parent = super().qs - return get_authorized_findings(Permissions.Finding_View, parent) + return get_authorized_findings_for_queryset(Permissions.Finding_View, parent) class ReportFindingFilter(ReportFindingFilterHelper, FindingTagFilter): @@ -3260,7 +3260,7 @@ def __init__(self, *args, **kwargs): # duplicate_finding queryset needs to restricted in line with permissions # and inline with report scope to avoid a dropdown with 100K entries duplicate_finding_query_set = self.form.fields["duplicate_finding"].queryset - duplicate_finding_query_set = get_authorized_findings(Permissions.Finding_View, duplicate_finding_query_set) + duplicate_finding_query_set = get_authorized_findings_for_queryset(Permissions.Finding_View, duplicate_finding_query_set) if self.test: duplicate_finding_query_set = duplicate_finding_query_set.filter(test=self.test) diff --git a/dojo/finding/queries.py b/dojo/finding/queries.py index 5d186fdd2a5..adca2ef3bdf 100644 --- a/dojo/finding/queries.py +++ b/dojo/finding/queries.py @@ -2,7 +2,7 @@ from functools import partial from crum import get_current_user -from django.db.models import Exists, OuterRef, Q, Value +from django.db.models import OuterRef, Q, Subquery, Value from django.db.models.functions import Coalesce from django.db.models.query import Prefetch, QuerySet @@ -20,38 +20,57 @@ Vulnerability_Id, ) from dojo.query_utils import build_count_subquery +from dojo.request_cache import cache_for_request logger = logging.getLogger(__name__) -def get_authorized_groups(permission, user=None): +# Cached: all parameters are hashable, no dynamic queryset filtering +@cache_for_request +def get_authorized_findings(permission, user=None): + """Cached - returns all findings the user is authorized to see.""" + if user is None: + user = get_current_user() + if user is None: + return Finding.objects.none() + findings = Finding.objects.all().order_by("id") + + if user.is_superuser: + return findings + + if user_has_global_permission(user, permission): + return findings + roles = get_roles_for_permission(permission) + + # Get authorized product/product_type IDs via subqueries authorized_product_type_roles = Product_Type_Member.objects.filter( - product_type=OuterRef("test__engagement__product__prod_type_id"), - user=user, - role__in=roles) + user=user, role__in=roles, + ).values("product_type_id") + authorized_product_roles = Product_Member.objects.filter( - product=OuterRef("test__engagement__product_id"), - user=user, - role__in=roles) + user=user, role__in=roles, + ).values("product_id") + authorized_product_type_groups = Product_Type_Group.objects.filter( - product_type=OuterRef("test__engagement__product__prod_type_id"), - group__users=user, - role__in=roles) + group__users=user, role__in=roles, + ).values("product_type_id") + authorized_product_groups = Product_Group.objects.filter( - product=OuterRef("test__engagement__product_id"), - group__users=user, - role__in=roles) - - return ( - authorized_product_type_roles, - authorized_product_roles, - authorized_product_type_groups, - authorized_product_groups, + group__users=user, role__in=roles, + ).values("product_id") + + # Filter using IN with Subquery - no annotations needed + return findings.filter( + Q(test__engagement__product__prod_type_id__in=Subquery(authorized_product_type_roles)) + | Q(test__engagement__product_id__in=Subquery(authorized_product_roles)) + | Q(test__engagement__product__prod_type_id__in=Subquery(authorized_product_type_groups)) + | Q(test__engagement__product_id__in=Subquery(authorized_product_groups)), ) -def get_authorized_findings(permission, queryset=None, user=None): +def get_authorized_findings_for_queryset(permission, queryset, user=None): + """Filters a provided queryset for authorization. Not cached due to dynamic queryset parameter.""" if user is None: user = get_current_user() if user is None: @@ -64,25 +83,36 @@ def get_authorized_findings(permission, queryset=None, user=None): if user_has_global_permission(user, permission): return findings - ( - authorized_product_type_roles, - authorized_product_roles, - authorized_product_type_groups, - authorized_product_groups, - ) = get_authorized_groups(permission, user=user) - - findings = findings.annotate( - test__engagement__product__prod_type__member=Exists(authorized_product_type_roles), - test__engagement__product__member=Exists(authorized_product_roles), - test__engagement__product__prod_type__authorized_group=Exists(authorized_product_type_groups), - test__engagement__product__authorized_group=Exists(authorized_product_groups)) + roles = get_roles_for_permission(permission) + + # Get authorized product/product_type IDs via subqueries + authorized_product_type_roles = Product_Type_Member.objects.filter( + user=user, role__in=roles, + ).values("product_type_id") + + authorized_product_roles = Product_Member.objects.filter( + user=user, role__in=roles, + ).values("product_id") + + authorized_product_type_groups = Product_Type_Group.objects.filter( + group__users=user, role__in=roles, + ).values("product_type_id") + + authorized_product_groups = Product_Group.objects.filter( + group__users=user, role__in=roles, + ).values("product_id") + + # Filter using IN with Subquery - no annotations needed return findings.filter( - Q(test__engagement__product__prod_type__member=True) - | Q(test__engagement__product__member=True) - | Q(test__engagement__product__prod_type__authorized_group=True) - | Q(test__engagement__product__authorized_group=True)) + Q(test__engagement__product__prod_type_id__in=Subquery(authorized_product_type_roles)) + | Q(test__engagement__product_id__in=Subquery(authorized_product_roles)) + | Q(test__engagement__product__prod_type_id__in=Subquery(authorized_product_type_groups)) + | Q(test__engagement__product_id__in=Subquery(authorized_product_groups)), + ) +# Cached: all parameters are hashable, no dynamic queryset filtering +@cache_for_request def get_authorized_stub_findings(permission): user = get_current_user() @@ -95,34 +125,45 @@ def get_authorized_stub_findings(permission): if user_has_global_permission(user, permission): return Stub_Finding.objects.all().order_by("id") - ( - authorized_product_type_roles, - authorized_product_roles, - authorized_product_type_groups, - authorized_product_groups, - ) = get_authorized_groups(permission, user=user) - - findings = Stub_Finding.objects.annotate( - test__engagement__product__prod_type__member=Exists(authorized_product_type_roles), - test__engagement__product__member=Exists(authorized_product_roles), - test__engagement__product__prod_type__authorized_group=Exists(authorized_product_type_groups), - test__engagement__product__authorized_group=Exists(authorized_product_groups)).order_by("id") - return findings.filter( - Q(test__engagement__product__prod_type__member=True) - | Q(test__engagement__product__member=True) - | Q(test__engagement__product__prod_type__authorized_group=True) - | Q(test__engagement__product__authorized_group=True)) + roles = get_roles_for_permission(permission) + # Get authorized product/product_type IDs via subqueries + authorized_product_type_roles = Product_Type_Member.objects.filter( + user=user, role__in=roles, + ).values("product_type_id") -def get_authorized_vulnerability_ids(permission, queryset=None, user=None): + authorized_product_roles = Product_Member.objects.filter( + user=user, role__in=roles, + ).values("product_id") + authorized_product_type_groups = Product_Type_Group.objects.filter( + group__users=user, role__in=roles, + ).values("product_type_id") + + authorized_product_groups = Product_Group.objects.filter( + group__users=user, role__in=roles, + ).values("product_id") + + # Filter using IN with Subquery - no annotations needed + return Stub_Finding.objects.filter( + Q(test__engagement__product__prod_type_id__in=Subquery(authorized_product_type_roles)) + | Q(test__engagement__product_id__in=Subquery(authorized_product_roles)) + | Q(test__engagement__product__prod_type_id__in=Subquery(authorized_product_type_groups)) + | Q(test__engagement__product_id__in=Subquery(authorized_product_groups)), + ).order_by("id") + + +# Cached: all parameters are hashable, no dynamic queryset filtering +@cache_for_request +def get_authorized_vulnerability_ids(permission, user=None): + """Cached - returns all vulnerability IDs the user is authorized to see.""" if user is None: user = get_current_user() if user is None: return Vulnerability_Id.objects.none() - vulnerability_ids = Vulnerability_Id.objects.all() if queryset is None else queryset + vulnerability_ids = Vulnerability_Id.objects.all() if user.is_superuser: return vulnerability_ids @@ -131,32 +172,73 @@ def get_authorized_vulnerability_ids(permission, queryset=None, user=None): return vulnerability_ids roles = get_roles_for_permission(permission) + + # Get authorized product/product_type IDs via subqueries authorized_product_type_roles = Product_Type_Member.objects.filter( - product_type=OuterRef("finding__test__engagement__product__prod_type_id"), - user=user, - role__in=roles) + user=user, role__in=roles, + ).values("product_type_id") + authorized_product_roles = Product_Member.objects.filter( - product=OuterRef("finding__test__engagement__product_id"), - user=user, - role__in=roles) + user=user, role__in=roles, + ).values("product_id") + authorized_product_type_groups = Product_Type_Group.objects.filter( - product_type=OuterRef("finding__test__engagement__product__prod_type_id"), - group__users=user, - role__in=roles) + group__users=user, role__in=roles, + ).values("product_type_id") + authorized_product_groups = Product_Group.objects.filter( - product=OuterRef("finding__test__engagement__product_id"), - group__users=user, - role__in=roles) - vulnerability_ids = vulnerability_ids.annotate( - finding__test__engagement__product__prod_type__member=Exists(authorized_product_type_roles), - finding__test__engagement__product__member=Exists(authorized_product_roles), - finding__test__engagement__product__prod_type__authorized_group=Exists(authorized_product_type_groups), - finding__test__engagement__product__authorized_group=Exists(authorized_product_groups)) + group__users=user, role__in=roles, + ).values("product_id") + + # Filter using IN with Subquery - no annotations needed return vulnerability_ids.filter( - Q(finding__test__engagement__product__prod_type__member=True) - | Q(finding__test__engagement__product__member=True) - | Q(finding__test__engagement__product__prod_type__authorized_group=True) - | Q(finding__test__engagement__product__authorized_group=True)) + Q(finding__test__engagement__product__prod_type_id__in=Subquery(authorized_product_type_roles)) + | Q(finding__test__engagement__product_id__in=Subquery(authorized_product_roles)) + | Q(finding__test__engagement__product__prod_type_id__in=Subquery(authorized_product_type_groups)) + | Q(finding__test__engagement__product_id__in=Subquery(authorized_product_groups)), + ) + + +def get_authorized_vulnerability_ids_for_queryset(permission, queryset, user=None): + """Filters a provided queryset for authorization. Not cached due to dynamic queryset parameter.""" + if user is None: + user = get_current_user() + + if user is None: + return Vulnerability_Id.objects.none() + + if user.is_superuser: + return queryset + + if user_has_global_permission(user, permission): + return queryset + + roles = get_roles_for_permission(permission) + + # Get authorized product/product_type IDs via subqueries + authorized_product_type_roles = Product_Type_Member.objects.filter( + user=user, role__in=roles, + ).values("product_type_id") + + authorized_product_roles = Product_Member.objects.filter( + user=user, role__in=roles, + ).values("product_id") + + authorized_product_type_groups = Product_Type_Group.objects.filter( + group__users=user, role__in=roles, + ).values("product_type_id") + + authorized_product_groups = Product_Group.objects.filter( + group__users=user, role__in=roles, + ).values("product_id") + + # Filter using IN with Subquery - no annotations needed + return queryset.filter( + Q(finding__test__engagement__product__prod_type_id__in=Subquery(authorized_product_type_roles)) + | Q(finding__test__engagement__product_id__in=Subquery(authorized_product_roles)) + | Q(finding__test__engagement__product__prod_type_id__in=Subquery(authorized_product_type_groups)) + | Q(finding__test__engagement__product_id__in=Subquery(authorized_product_groups)), + ) def prefetch_for_findings(findings, prefetch_type="all", *, exclude_untouched=True): diff --git a/dojo/finding/views.py b/dojo/finding/views.py index b5bfb593043..ef06cced0db 100644 --- a/dojo/finding/views.py +++ b/dojo/finding/views.py @@ -49,7 +49,7 @@ TestImportFilter, TestImportFindingActionFilter, ) -from dojo.finding.queries import get_authorized_findings, prefetch_for_findings +from dojo.finding.queries import get_authorized_findings, get_authorized_findings_for_queryset, prefetch_for_findings from dojo.forms import ( ApplyFindingTemplateForm, ClearFindingReviewForm, @@ -2641,7 +2641,7 @@ def _bulk_delete_findings(request, pid, form, finding_to_update, finds, total_fi request.user, product, Permissions.Finding_Delete, ) - finds = get_authorized_findings( + finds = get_authorized_findings_for_queryset( Permissions.Finding_Delete, finds, ).distinct() @@ -3047,7 +3047,7 @@ def finding_bulk_update_all(request, pid=None): ) # make sure users are not editing stuff they are not authorized for - finds = get_authorized_findings( + finds = get_authorized_findings_for_queryset( Permissions.Finding_Edit, finds, ).distinct() diff --git a/dojo/finding_group/queries.py b/dojo/finding_group/queries.py index 987cf7f6901..030342521b5 100644 --- a/dojo/finding_group/queries.py +++ b/dojo/finding_group/queries.py @@ -1,19 +1,22 @@ from crum import get_current_user -from django.db.models import Exists, OuterRef, Q +from django.db.models import Q, Subquery from dojo.authorization.authorization import get_roles_for_permission, user_has_global_permission from dojo.models import Finding_Group, Product_Group, Product_Member, Product_Type_Group, Product_Type_Member +from dojo.request_cache import cache_for_request -def get_authorized_finding_groups(permission, queryset=None, user=None): - +# Cached: all parameters are hashable, no dynamic queryset filtering +@cache_for_request +def get_authorized_finding_groups(permission, user=None): + """Cached - returns all finding groups the user is authorized to see.""" if user is None: user = get_current_user() if user is None: return Finding_Group.objects.none() - finding_groups = Finding_Group.objects.all() if queryset is None else queryset + finding_groups = Finding_Group.objects.all() if user.is_superuser: return finding_groups @@ -22,29 +25,70 @@ def get_authorized_finding_groups(permission, queryset=None, user=None): return finding_groups roles = get_roles_for_permission(permission) + + # Get authorized product/product_type IDs via subqueries authorized_product_type_roles = Product_Type_Member.objects.filter( - product_type=OuterRef("test__engagement__product__prod_type_id"), - user=user, - role__in=roles) + user=user, role__in=roles, + ).values("product_type_id") + authorized_product_roles = Product_Member.objects.filter( - product=OuterRef("test__engagement__product_id"), - user=user, - role__in=roles) + user=user, role__in=roles, + ).values("product_id") + authorized_product_type_groups = Product_Type_Group.objects.filter( - product_type=OuterRef("test__engagement__product__prod_type_id"), - group__users=user, - role__in=roles) + group__users=user, role__in=roles, + ).values("product_type_id") + authorized_product_groups = Product_Group.objects.filter( - product=OuterRef("test__engagement__product_id"), - group__users=user, - role__in=roles) - finding_groups = finding_groups.annotate( - test__engagement__product__prod_type__member=Exists(authorized_product_type_roles), - test__engagement__product__member=Exists(authorized_product_roles), - test__engagement__product__prod_type__authorized_group=Exists(authorized_product_type_groups), - test__engagement__product__authorized_group=Exists(authorized_product_groups)) + group__users=user, role__in=roles, + ).values("product_id") + + # Filter using IN with Subquery - no annotations needed return finding_groups.filter( - Q(test__engagement__product__prod_type__member=True) - | Q(test__engagement__product__member=True) - | Q(test__engagement__product__prod_type__authorized_group=True) - | Q(test__engagement__product__authorized_group=True)) + Q(test__engagement__product__prod_type_id__in=Subquery(authorized_product_type_roles)) + | Q(test__engagement__product_id__in=Subquery(authorized_product_roles)) + | Q(test__engagement__product__prod_type_id__in=Subquery(authorized_product_type_groups)) + | Q(test__engagement__product_id__in=Subquery(authorized_product_groups)), + ) + + +def get_authorized_finding_groups_for_queryset(permission, queryset, user=None): + """Filters a provided queryset for authorization. Not cached due to dynamic queryset parameter.""" + if user is None: + user = get_current_user() + + if user is None: + return Finding_Group.objects.none() + + if user.is_superuser: + return queryset + + if user_has_global_permission(user, permission): + return queryset + + roles = get_roles_for_permission(permission) + + # Get authorized product/product_type IDs via subqueries + authorized_product_type_roles = Product_Type_Member.objects.filter( + user=user, role__in=roles, + ).values("product_type_id") + + authorized_product_roles = Product_Member.objects.filter( + user=user, role__in=roles, + ).values("product_id") + + authorized_product_type_groups = Product_Type_Group.objects.filter( + group__users=user, role__in=roles, + ).values("product_type_id") + + authorized_product_groups = Product_Group.objects.filter( + group__users=user, role__in=roles, + ).values("product_id") + + # Filter using IN with Subquery - no annotations needed + return queryset.filter( + Q(test__engagement__product__prod_type_id__in=Subquery(authorized_product_type_roles)) + | Q(test__engagement__product_id__in=Subquery(authorized_product_roles)) + | Q(test__engagement__product__prod_type_id__in=Subquery(authorized_product_type_groups)) + | Q(test__engagement__product_id__in=Subquery(authorized_product_groups)), + ) diff --git a/dojo/group/queries.py b/dojo/group/queries.py index dedb0d35e14..deee04a346a 100644 --- a/dojo/group/queries.py +++ b/dojo/group/queries.py @@ -1,11 +1,14 @@ from crum import get_current_user -from django.db.models import Exists, OuterRef +from django.db.models import Subquery from dojo.authorization.authorization import get_roles_for_permission from dojo.authorization.roles_permissions import Permissions from dojo.models import Dojo_Group, Dojo_Group_Member, Product_Group, Product_Type_Group, Role +from dojo.request_cache import cache_for_request +# Cached: all parameters are hashable, no dynamic queryset filtering +@cache_for_request def get_authorized_groups(permission): user = get_current_user() @@ -16,11 +19,16 @@ def get_authorized_groups(permission): return Dojo_Group.objects.all().order_by("name") roles = get_roles_for_permission(permission) - authorized_roles = Dojo_Group_Member.objects.filter(group=OuterRef("pk"), - user=user, - role__in=roles) - groups = Dojo_Group.objects.annotate(user=Exists(authorized_roles)).order_by("name") - return groups.filter(user=True) + + # Get authorized group IDs via subquery + authorized_roles = Dojo_Group_Member.objects.filter( + user=user, role__in=roles, + ).values("group_id") + + # Filter using IN with Subquery - no annotations needed + return Dojo_Group.objects.filter( + pk__in=Subquery(authorized_roles), + ).order_by("name") def get_authorized_group_members(permission): diff --git a/dojo/jira_link/queries.py b/dojo/jira_link/queries.py index b077c076097..5ce281d2296 100644 --- a/dojo/jira_link/queries.py +++ b/dojo/jira_link/queries.py @@ -1,10 +1,13 @@ from crum import get_current_user -from django.db.models import Exists, OuterRef, Q +from django.db.models import Q, Subquery from dojo.authorization.authorization import get_roles_for_permission, user_has_global_permission from dojo.models import JIRA_Issue, JIRA_Project, Product_Group, Product_Member, Product_Type_Group, Product_Type_Member +from dojo.request_cache import cache_for_request +# Cached: all parameters are hashable, no dynamic queryset filtering +@cache_for_request def get_authorized_jira_projects(permission, user=None): if user is None: @@ -22,58 +25,42 @@ def get_authorized_jira_projects(permission, user=None): return jira_projects roles = get_roles_for_permission(permission) - engagement_authorized_product_type_roles = Product_Type_Member.objects.filter( - product_type=OuterRef("engagement__product__prod_type_id"), - user=user, - role__in=roles) - engagement_authorized_product_roles = Product_Member.objects.filter( - product=OuterRef("engagement__product_id"), - user=user, - role__in=roles) - engagement_authorized_product_type_groups = Product_Type_Group.objects.filter( - product_type=OuterRef("engagement__product__prod_type_id"), - group__users=user, - role__in=roles) - engagement_authorized_product_groups = Product_Group.objects.filter( - product=OuterRef("engagement__product_id"), - group__users=user, - role__in=roles) - product_authorized_product_type_roles = Product_Type_Member.objects.filter( - product_type=OuterRef("product__prod_type_id"), - user=user, - role__in=roles) - product_authorized_product_roles = Product_Member.objects.filter( - product=OuterRef("product_id"), - user=user, - role__in=roles) - product_authorized_product_type_groups = Product_Type_Group.objects.filter( - product_type=OuterRef("product__prod_type_id"), - group__users=user, - role__in=roles) - product_authorized_product_groups = Product_Group.objects.filter( - product=OuterRef("product_id"), - group__users=user, - role__in=roles) - jira_projects = jira_projects.annotate( - engagement__product__prod_type__member=Exists(engagement_authorized_product_type_roles), - engagement__product__member=Exists(engagement_authorized_product_roles), - engagement__product__prod_type__authorized_group=Exists(engagement_authorized_product_type_groups), - engagement__product__authorized_group=Exists(engagement_authorized_product_groups), - product__prod_type__member=Exists(product_authorized_product_type_roles), - product__member=Exists(product_authorized_product_roles), - product__prod_type__authorized_group=Exists(product_authorized_product_type_groups), - product__authorized_group=Exists(product_authorized_product_groups)) - return jira_projects.filter( - Q(engagement__product__prod_type__member=True) - | Q(engagement__product__member=True) - | Q(engagement__product__prod_type__authorized_group=True) - | Q(engagement__product__authorized_group=True) - | Q(product__prod_type__member=True) - | Q(product__member=True) - | Q(product__prod_type__authorized_group=True) - | Q(product__authorized_group=True)) + # Get authorized product/product_type IDs via subqueries + authorized_product_type_roles = Product_Type_Member.objects.filter( + user=user, role__in=roles, + ).values("product_type_id") + + authorized_product_roles = Product_Member.objects.filter( + user=user, role__in=roles, + ).values("product_id") + + authorized_product_type_groups = Product_Type_Group.objects.filter( + group__users=user, role__in=roles, + ).values("product_type_id") + authorized_product_groups = Product_Group.objects.filter( + group__users=user, role__in=roles, + ).values("product_id") + + # Filter using IN with Subquery - no annotations needed + # JIRA projects can be attached via engagement or product path + return jira_projects.filter( + # Engagement path + Q(engagement__product__prod_type_id__in=Subquery(authorized_product_type_roles)) + | Q(engagement__product_id__in=Subquery(authorized_product_roles)) + | Q(engagement__product__prod_type_id__in=Subquery(authorized_product_type_groups)) + | Q(engagement__product_id__in=Subquery(authorized_product_groups)) + # Product path + | Q(product__prod_type_id__in=Subquery(authorized_product_type_roles)) + | Q(product_id__in=Subquery(authorized_product_roles)) + | Q(product__prod_type_id__in=Subquery(authorized_product_type_groups)) + | Q(product_id__in=Subquery(authorized_product_groups)), + ) + + +# Cached: all parameters are hashable, no dynamic queryset filtering +@cache_for_request def get_authorized_jira_issues(permission): user = get_current_user() @@ -89,77 +76,40 @@ def get_authorized_jira_issues(permission): return jira_issues roles = get_roles_for_permission(permission) - engagement_authorized_product_type_roles = Product_Type_Member.objects.filter( - product_type=OuterRef("engagement__product__prod_type_id"), - user=user, - role__in=roles) - engagement_authorized_product_roles = Product_Member.objects.filter( - product=OuterRef("engagement__product_id"), - user=user, - role__in=roles) - engagement_authorized_product_type_groups = Product_Type_Group.objects.filter( - product_type=OuterRef("engagement__product__prod_type_id"), - group__users=user, - role__in=roles) - engagement_authorized_product_groups = Product_Group.objects.filter( - product=OuterRef("engagement__product_id"), - group__users=user, - role__in=roles) - finding_group_authorized_product_type_roles = Product_Type_Member.objects.filter( - product_type=OuterRef("finding_group__test__engagement__product__prod_type_id"), - user=user, - role__in=roles) - finding_group_authorized_product_roles = Product_Member.objects.filter( - product=OuterRef("finding_group__test__engagement__product_id"), - user=user, - role__in=roles) - finding_group_authorized_product_type_groups = Product_Type_Group.objects.filter( - product_type=OuterRef("finding_group__test__engagement__product__prod_type_id"), - group__users=user, - role__in=roles) - finding_group_authorized_product_groups = Product_Group.objects.filter( - product=OuterRef("finding_group__test__engagement__product_id"), - group__users=user, - role__in=roles) - finding_authorized_product_type_roles = Product_Type_Member.objects.filter( - product_type=OuterRef("finding__test__engagement__product__prod_type_id"), - user=user, - role__in=roles) - finding_authorized_product_roles = Product_Member.objects.filter( - product=OuterRef("finding__test__engagement__product_id"), - user=user, - role__in=roles) - finding_authorized_product_type_groups = Product_Type_Group.objects.filter( - product_type=OuterRef("finding__test__engagement__product__prod_type_id"), - group__users=user, - role__in=roles) - finding_authorized_product_groups = Product_Group.objects.filter( - product=OuterRef("finding__test__engagement__product_id"), - group__users=user, - role__in=roles) - jira_issues = jira_issues.annotate( - engagement__product__prod_type__member=Exists(engagement_authorized_product_type_roles), - engagement__product__member=Exists(engagement_authorized_product_roles), - engagement__product__prod_type__authorized_group=Exists(engagement_authorized_product_type_groups), - engagement__product__authorized_group=Exists(engagement_authorized_product_groups), - finding_group__test__engagement__product__prod_type__member=Exists(finding_group_authorized_product_type_roles), - finding_group__test__engagement__product__member=Exists(finding_group_authorized_product_roles), - finding_group__test__engagement__product__prod_type__authorized_group=Exists(finding_group_authorized_product_type_groups), - finding_group__test__engagement__product__authorized_group=Exists(finding_group_authorized_product_groups), - finding__test__engagement__product__prod_type__member=Exists(finding_authorized_product_type_roles), - finding__test__engagement__product__member=Exists(finding_authorized_product_roles), - finding__test__engagement__product__prod_type__authorized_group=Exists(finding_authorized_product_type_groups), - finding__test__engagement__product__authorized_group=Exists(finding_authorized_product_groups)) + + # Get authorized product/product_type IDs via subqueries + authorized_product_type_roles = Product_Type_Member.objects.filter( + user=user, role__in=roles, + ).values("product_type_id") + + authorized_product_roles = Product_Member.objects.filter( + user=user, role__in=roles, + ).values("product_id") + + authorized_product_type_groups = Product_Type_Group.objects.filter( + group__users=user, role__in=roles, + ).values("product_type_id") + + authorized_product_groups = Product_Group.objects.filter( + group__users=user, role__in=roles, + ).values("product_id") + + # Filter using IN with Subquery - no annotations needed + # JIRA issues can be attached via engagement, finding_group, or finding path return jira_issues.filter( - Q(engagement__product__prod_type__member=True) - | Q(engagement__product__member=True) - | Q(engagement__product__prod_type__authorized_group=True) - | Q(engagement__product__authorized_group=True) - | Q(finding_group__test__engagement__product__prod_type__member=True) - | Q(finding_group__test__engagement__product__member=True) - | Q(finding_group__test__engagement__product__prod_type__authorized_group=True) - | Q(finding_group__test__engagement__product__authorized_group=True) - | Q(finding__test__engagement__product__prod_type__member=True) - | Q(finding__test__engagement__product__member=True) - | Q(finding__test__engagement__product__prod_type__authorized_group=True) - | Q(finding__test__engagement__product__authorized_group=True)) + # Engagement path + Q(engagement__product__prod_type_id__in=Subquery(authorized_product_type_roles)) + | Q(engagement__product_id__in=Subquery(authorized_product_roles)) + | Q(engagement__product__prod_type_id__in=Subquery(authorized_product_type_groups)) + | Q(engagement__product_id__in=Subquery(authorized_product_groups)) + # Finding group path + | Q(finding_group__test__engagement__product__prod_type_id__in=Subquery(authorized_product_type_roles)) + | Q(finding_group__test__engagement__product_id__in=Subquery(authorized_product_roles)) + | Q(finding_group__test__engagement__product__prod_type_id__in=Subquery(authorized_product_type_groups)) + | Q(finding_group__test__engagement__product_id__in=Subquery(authorized_product_groups)) + # Finding path + | Q(finding__test__engagement__product__prod_type_id__in=Subquery(authorized_product_type_roles)) + | Q(finding__test__engagement__product_id__in=Subquery(authorized_product_roles)) + | Q(finding__test__engagement__product__prod_type_id__in=Subquery(authorized_product_type_groups)) + | Q(finding__test__engagement__product_id__in=Subquery(authorized_product_groups)), + ) diff --git a/dojo/metrics/utils.py b/dojo/metrics/utils.py index 3c7750e5130..7f931d5ad1f 100644 --- a/dojo/metrics/utils.py +++ b/dojo/metrics/utils.py @@ -17,7 +17,7 @@ from django.utils.translation import gettext as _ from dojo.authorization.roles_permissions import Permissions -from dojo.endpoint.queries import get_authorized_endpoint_status +from dojo.endpoint.queries import get_authorized_endpoint_status_for_queryset from dojo.filters import ( MetricsEndpointFilter, MetricsEndpointFilterWithoutObjectLookups, @@ -184,7 +184,7 @@ def endpoint_queries( "finding__reporter", ) - endpoints_query = get_authorized_endpoint_status(Permissions.Endpoint_View, endpoints_query, request.user) + endpoints_query = get_authorized_endpoint_status_for_queryset(Permissions.Endpoint_View, endpoints_query, request.user) filter_string_matching = get_system_setting("filter_string_matching", False) filter_class = MetricsEndpointFilterWithoutObjectLookups if filter_string_matching else MetricsEndpointFilter endpoints = filter_class(request.GET, queryset=endpoints_query) @@ -230,8 +230,8 @@ def endpoint_queries( "finding__test__engagement__product", ) - endpoints_closed = get_authorized_endpoint_status(Permissions.Endpoint_View, endpoints_closed, request.user) - accepted_endpoints = get_authorized_endpoint_status(Permissions.Endpoint_View, accepted_endpoints, request.user) + endpoints_closed = get_authorized_endpoint_status_for_queryset(Permissions.Endpoint_View, endpoints_closed, request.user) + accepted_endpoints = get_authorized_endpoint_status_for_queryset(Permissions.Endpoint_View, accepted_endpoints, request.user) accepted_endpoints_counts = severity_count(accepted_endpoints, "aggregate", "finding__severity") weeks_between, months_between = period_deltas(start_date, end_date) diff --git a/dojo/product/queries.py b/dojo/product/queries.py index 69532212a59..1b5efffef8c 100644 --- a/dojo/product/queries.py +++ b/dojo/product/queries.py @@ -1,5 +1,5 @@ from crum import get_current_user -from django.db.models import Exists, OuterRef, Q +from django.db.models import Q, Subquery from dojo.authorization.authorization import ( get_roles_for_permission, @@ -22,8 +22,11 @@ Product_Type_Group, Product_Type_Member, ) +from dojo.request_cache import cache_for_request +# Cached: all parameters are hashable, no dynamic queryset filtering +@cache_for_request def get_authorized_products(permission, user=None): if user is None: @@ -39,30 +42,31 @@ def get_authorized_products(permission, user=None): return Product.objects.all().order_by("name") roles = get_roles_for_permission(permission) + + # Get authorized product/product_type IDs via subqueries authorized_product_type_roles = Product_Type_Member.objects.filter( - product_type=OuterRef("prod_type_id"), - user=user, - role__in=roles) + user=user, role__in=roles, + ).values("product_type_id") + authorized_product_roles = Product_Member.objects.filter( - product=OuterRef("pk"), - user=user, - role__in=roles) + user=user, role__in=roles, + ).values("product_id") + authorized_product_type_groups = Product_Type_Group.objects.filter( - product_type=OuterRef("prod_type_id"), - group__users=user, - role__in=roles) + group__users=user, role__in=roles, + ).values("product_type_id") + authorized_product_groups = Product_Group.objects.filter( - product=OuterRef("pk"), - group__users=user, - role__in=roles) - products = Product.objects.annotate( - prod_type__member=Exists(authorized_product_type_roles), - member=Exists(authorized_product_roles), - prod_type__authorized_group=Exists(authorized_product_type_groups), - authorized_group=Exists(authorized_product_groups)).order_by("name") - return products.filter( - Q(prod_type__member=True) | Q(member=True) - | Q(prod_type__authorized_group=True) | Q(authorized_group=True)) + group__users=user, role__in=roles, + ).values("product_id") + + # Filter using IN with Subquery - no annotations needed + return Product.objects.filter( + Q(prod_type_id__in=Subquery(authorized_product_type_roles)) + | Q(pk__in=Subquery(authorized_product_roles)) + | Q(prod_type_id__in=Subquery(authorized_product_type_groups)) + | Q(pk__in=Subquery(authorized_product_groups)), + ).order_by("name") def get_authorized_members_for_product(product, permission): @@ -143,6 +147,8 @@ def get_authorized_product_groups(permission): return Product_Group.objects.filter(product__in=products).order_by("id").select_related("role") +# Cached: all parameters are hashable, no dynamic queryset filtering +@cache_for_request def get_authorized_app_analysis(permission): user = get_current_user() @@ -156,32 +162,35 @@ def get_authorized_app_analysis(permission): return App_Analysis.objects.all().order_by("id") roles = get_roles_for_permission(permission) + + # Get authorized product/product_type IDs via subqueries authorized_product_type_roles = Product_Type_Member.objects.filter( - product_type=OuterRef("product__prod_type_id"), - user=user, - role__in=roles) + user=user, role__in=roles, + ).values("product_type_id") + authorized_product_roles = Product_Member.objects.filter( - product=OuterRef("product_id"), - user=user, - role__in=roles) + user=user, role__in=roles, + ).values("product_id") + authorized_product_type_groups = Product_Type_Group.objects.filter( - product_type=OuterRef("product__prod_type_id"), - group__users=user, - role__in=roles) + group__users=user, role__in=roles, + ).values("product_type_id") + authorized_product_groups = Product_Group.objects.filter( - product=OuterRef("product_id"), - group__users=user, - role__in=roles) - app_analysis = App_Analysis.objects.annotate( - product__prod_type__member=Exists(authorized_product_type_roles), - product__member=Exists(authorized_product_roles), - product__prod_type__authorized_group=Exists(authorized_product_type_groups), - product__authorized_group=Exists(authorized_product_groups)).order_by("id") - return app_analysis.filter( - Q(product__prod_type__member=True) | Q(product__member=True) - | Q(product__prod_type__authorized_group=True) | Q(product__authorized_group=True)) + group__users=user, role__in=roles, + ).values("product_id") + + # Filter using IN with Subquery - no annotations needed + return App_Analysis.objects.filter( + Q(product__prod_type_id__in=Subquery(authorized_product_type_roles)) + | Q(product_id__in=Subquery(authorized_product_roles)) + | Q(product__prod_type_id__in=Subquery(authorized_product_type_groups)) + | Q(product_id__in=Subquery(authorized_product_groups)), + ).order_by("id") +# Cached: all parameters are hashable, no dynamic queryset filtering +@cache_for_request def get_authorized_dojo_meta(permission): user = get_current_user() @@ -195,83 +204,48 @@ def get_authorized_dojo_meta(permission): return DojoMeta.objects.all().order_by("id") roles = get_roles_for_permission(permission) + + # Get authorized product/product_type IDs via subqueries for all three paths + # Product path product_authorized_product_type_roles = Product_Type_Member.objects.filter( - product_type=OuterRef("product__prod_type_id"), - user=user, - role__in=roles) + user=user, role__in=roles, + ).values("product_type_id") + product_authorized_product_roles = Product_Member.objects.filter( - product=OuterRef("product_id"), - user=user, - role__in=roles) + user=user, role__in=roles, + ).values("product_id") + product_authorized_product_type_groups = Product_Type_Group.objects.filter( - product_type=OuterRef("product__prod_type_id"), - group__users=user, - role__in=roles) + group__users=user, role__in=roles, + ).values("product_type_id") + product_authorized_product_groups = Product_Group.objects.filter( - product=OuterRef("product_id"), - group__users=user, - role__in=roles) - endpoint_authorized_product_type_roles = Product_Type_Member.objects.filter( - product_type=OuterRef("endpoint__product__prod_type_id"), - user=user, - role__in=roles) - endpoint_authorized_product_roles = Product_Member.objects.filter( - product=OuterRef("endpoint__product_id"), - user=user, - role__in=roles) - endpoint_authorized_product_type_groups = Product_Type_Group.objects.filter( - product_type=OuterRef("endpoint__product__prod_type_id"), - group__users=user, - role__in=roles) - endpoint_authorized_product_groups = Product_Group.objects.filter( - product=OuterRef("endpoint__product_id"), - group__users=user, - role__in=roles) - finding_authorized_product_type_roles = Product_Type_Member.objects.filter( - product_type=OuterRef("finding__test__engagement__product__prod_type_id"), - user=user, - role__in=roles) - finding_authorized_product_roles = Product_Member.objects.filter( - product=OuterRef("finding__test__engagement__product_id"), - user=user, - role__in=roles) - finding_authorized_product_type_groups = Product_Type_Group.objects.filter( - product_type=OuterRef("finding__test__engagement__product__prod_type_id"), - group__users=user, - role__in=roles) - finding_authorized_product_groups = Product_Group.objects.filter( - product=OuterRef("finding__test__engagement__product_id"), - group__users=user, - role__in=roles) - dojo_meta = DojoMeta.objects.annotate( - product__prod_type__member=Exists(product_authorized_product_type_roles), - product__member=Exists(product_authorized_product_roles), - product__prod_type__authorized_group=Exists(product_authorized_product_type_groups), - product__authorized_group=Exists(product_authorized_product_groups), - endpoint__product__prod_type__member=Exists(endpoint_authorized_product_type_roles), - endpoint__product__member=Exists(endpoint_authorized_product_roles), - endpoint__product__prod_type__authorized_group=Exists(endpoint_authorized_product_type_groups), - endpoint__product__authorized_group=Exists(endpoint_authorized_product_groups), - finding__test__engagement__product__prod_type__member=Exists(finding_authorized_product_type_roles), - finding__test__engagement__product__member=Exists(finding_authorized_product_roles), - finding__test__engagement__product__prod_type__authorized_group=Exists(finding_authorized_product_type_groups), - finding__test__engagement__product__authorized_group=Exists(finding_authorized_product_groups), + group__users=user, role__in=roles, + ).values("product_id") + + # Filter using IN with Subquery - no annotations needed + # DojoMeta can be attached to product, endpoint, or finding + return DojoMeta.objects.filter( + # Product path + Q(product__prod_type_id__in=Subquery(product_authorized_product_type_roles)) + | Q(product_id__in=Subquery(product_authorized_product_roles)) + | Q(product__prod_type_id__in=Subquery(product_authorized_product_type_groups)) + | Q(product_id__in=Subquery(product_authorized_product_groups)) + # Endpoint path + | Q(endpoint__product__prod_type_id__in=Subquery(product_authorized_product_type_roles)) + | Q(endpoint__product_id__in=Subquery(product_authorized_product_roles)) + | Q(endpoint__product__prod_type_id__in=Subquery(product_authorized_product_type_groups)) + | Q(endpoint__product_id__in=Subquery(product_authorized_product_groups)) + # Finding path + | Q(finding__test__engagement__product__prod_type_id__in=Subquery(product_authorized_product_type_roles)) + | Q(finding__test__engagement__product_id__in=Subquery(product_authorized_product_roles)) + | Q(finding__test__engagement__product__prod_type_id__in=Subquery(product_authorized_product_type_groups)) + | Q(finding__test__engagement__product_id__in=Subquery(product_authorized_product_groups)), ).order_by("id") - return dojo_meta.filter( - Q(product__prod_type__member=True) - | Q(product__member=True) - | Q(product__prod_type__authorized_group=True) - | Q(product__authorized_group=True) - | Q(endpoint__product__prod_type__member=True) - | Q(endpoint__product__member=True) - | Q(endpoint__product__prod_type__authorized_group=True) - | Q(endpoint__product__authorized_group=True) - | Q(finding__test__engagement__product__prod_type__member=True) - | Q(finding__test__engagement__product__member=True) - | Q(finding__test__engagement__product__prod_type__authorized_group=True) - | Q(finding__test__engagement__product__authorized_group=True)) +# Cached: all parameters are hashable, no dynamic queryset filtering +@cache_for_request def get_authorized_languages(permission): user = get_current_user() @@ -285,32 +259,35 @@ def get_authorized_languages(permission): return Languages.objects.all().order_by("id") roles = get_roles_for_permission(permission) + + # Get authorized product/product_type IDs via subqueries authorized_product_type_roles = Product_Type_Member.objects.filter( - product_type=OuterRef("product__prod_type_id"), - user=user, - role__in=roles) + user=user, role__in=roles, + ).values("product_type_id") + authorized_product_roles = Product_Member.objects.filter( - product=OuterRef("product_id"), - user=user, - role__in=roles) + user=user, role__in=roles, + ).values("product_id") + authorized_product_type_groups = Product_Type_Group.objects.filter( - product_type=OuterRef("product__prod_type_id"), - group__users=user, - role__in=roles) + group__users=user, role__in=roles, + ).values("product_type_id") + authorized_product_groups = Product_Group.objects.filter( - product=OuterRef("product_id"), - group__users=user, - role__in=roles) - languages = Languages.objects.annotate( - product__prod_type__member=Exists(authorized_product_type_roles), - product__member=Exists(authorized_product_roles), - product__prod_type__authorized_group=Exists(authorized_product_type_groups), - product__authorized_group=Exists(authorized_product_groups)).order_by("id") - return languages.filter( - Q(product__prod_type__member=True) | Q(product__member=True) - | Q(product__prod_type__authorized_group=True) | Q(product__authorized_group=True)) + group__users=user, role__in=roles, + ).values("product_id") + + # Filter using IN with Subquery - no annotations needed + return Languages.objects.filter( + Q(product__prod_type_id__in=Subquery(authorized_product_type_roles)) + | Q(product_id__in=Subquery(authorized_product_roles)) + | Q(product__prod_type_id__in=Subquery(authorized_product_type_groups)) + | Q(product_id__in=Subquery(authorized_product_groups)), + ).order_by("id") +# Cached: all parameters are hashable, no dynamic queryset filtering +@cache_for_request def get_authorized_engagement_presets(permission): user = get_current_user() @@ -324,32 +301,35 @@ def get_authorized_engagement_presets(permission): return Engagement_Presets.objects.all().order_by("id") roles = get_roles_for_permission(permission) + + # Get authorized product/product_type IDs via subqueries authorized_product_type_roles = Product_Type_Member.objects.filter( - product_type=OuterRef("product__prod_type_id"), - user=user, - role__in=roles) + user=user, role__in=roles, + ).values("product_type_id") + authorized_product_roles = Product_Member.objects.filter( - product=OuterRef("product_id"), - user=user, - role__in=roles) + user=user, role__in=roles, + ).values("product_id") + authorized_product_type_groups = Product_Type_Group.objects.filter( - product_type=OuterRef("product__prod_type_id"), - group__users=user, - role__in=roles) + group__users=user, role__in=roles, + ).values("product_type_id") + authorized_product_groups = Product_Group.objects.filter( - product=OuterRef("product_id"), - group__users=user, - role__in=roles) - engagement_presets = Engagement_Presets.objects.annotate( - product__prod_type__member=Exists(authorized_product_type_roles), - product__member=Exists(authorized_product_roles), - product__prod_type__authorized_group=Exists(authorized_product_type_groups), - product__authorized_group=Exists(authorized_product_groups)).order_by("id") - return engagement_presets.filter( - Q(product__prod_type__member=True) | Q(product__member=True) - | Q(product__prod_type__authorized_group=True) | Q(product__authorized_group=True)) + group__users=user, role__in=roles, + ).values("product_id") + + # Filter using IN with Subquery - no annotations needed + return Engagement_Presets.objects.filter( + Q(product__prod_type_id__in=Subquery(authorized_product_type_roles)) + | Q(product_id__in=Subquery(authorized_product_roles)) + | Q(product__prod_type_id__in=Subquery(authorized_product_type_groups)) + | Q(product_id__in=Subquery(authorized_product_groups)), + ).order_by("id") +# Cached: all parameters are hashable, no dynamic queryset filtering +@cache_for_request def get_authorized_product_api_scan_configurations(permission): user = get_current_user() @@ -363,27 +343,28 @@ def get_authorized_product_api_scan_configurations(permission): return Product_API_Scan_Configuration.objects.all().order_by("id") roles = get_roles_for_permission(permission) + + # Get authorized product/product_type IDs via subqueries authorized_product_type_roles = Product_Type_Member.objects.filter( - product_type=OuterRef("product__prod_type_id"), - user=user, - role__in=roles) + user=user, role__in=roles, + ).values("product_type_id") + authorized_product_roles = Product_Member.objects.filter( - product=OuterRef("product_id"), - user=user, - role__in=roles) + user=user, role__in=roles, + ).values("product_id") + authorized_product_type_groups = Product_Type_Group.objects.filter( - product_type=OuterRef("product__prod_type_id"), - group__users=user, - role__in=roles) + group__users=user, role__in=roles, + ).values("product_type_id") + authorized_product_groups = Product_Group.objects.filter( - product=OuterRef("product_id"), - group__users=user, - role__in=roles) - product_api_scan_configurations = Product_API_Scan_Configuration.objects.annotate( - product__prod_type__member=Exists(authorized_product_type_roles), - product__member=Exists(authorized_product_roles), - product__prod_type__authorized_group=Exists(authorized_product_type_groups), - product__authorized_group=Exists(authorized_product_groups)).order_by("id") - return product_api_scan_configurations.filter( - Q(product__prod_type__member=True) | Q(product__member=True) - | Q(product__prod_type__authorized_group=True) | Q(product__authorized_group=True)) + group__users=user, role__in=roles, + ).values("product_id") + + # Filter using IN with Subquery - no annotations needed + return Product_API_Scan_Configuration.objects.filter( + Q(product__prod_type_id__in=Subquery(authorized_product_type_roles)) + | Q(product_id__in=Subquery(authorized_product_roles)) + | Q(product__prod_type_id__in=Subquery(authorized_product_type_groups)) + | Q(product_id__in=Subquery(authorized_product_groups)), + ).order_by("id") diff --git a/dojo/product_type/queries.py b/dojo/product_type/queries.py index 1d95ac81170..a0ae41e2590 100644 --- a/dojo/product_type/queries.py +++ b/dojo/product_type/queries.py @@ -1,5 +1,5 @@ from crum import get_current_user -from django.db.models import Exists, OuterRef, Q +from django.db.models import Q, Subquery from dojo.authorization.authorization import ( get_roles_for_permission, @@ -10,8 +10,11 @@ from dojo.authorization.roles_permissions import Permissions from dojo.group.queries import get_authorized_groups from dojo.models import Global_Role, Product_Type, Product_Type_Group, Product_Type_Member +from dojo.request_cache import cache_for_request +# Cached: all parameters are hashable, no dynamic queryset filtering +@cache_for_request def get_authorized_product_types(permission): user = get_current_user() @@ -25,17 +28,21 @@ def get_authorized_product_types(permission): return Product_Type.objects.all().order_by("name") roles = get_roles_for_permission(permission) - authorized_roles = Product_Type_Member.objects.filter(product_type=OuterRef("pk"), - user=user, - role__in=roles) + + # Get authorized product_type IDs via subqueries + authorized_roles = Product_Type_Member.objects.filter( + user=user, role__in=roles, + ).values("product_type_id") + authorized_groups = Product_Type_Group.objects.filter( - product_type=OuterRef("pk"), - group__users=user, - role__in=roles) - product_types = Product_Type.objects.annotate( - member=Exists(authorized_roles), - authorized_group=Exists(authorized_groups)).order_by("name") - return product_types.filter(Q(member=True) | Q(authorized_group=True)) + group__users=user, role__in=roles, + ).values("product_type_id") + + # Filter using IN with Subquery - no annotations needed + return Product_Type.objects.filter( + Q(pk__in=Subquery(authorized_roles)) + | Q(pk__in=Subquery(authorized_groups)), + ).order_by("name") def get_authorized_members_for_product_type(product_type, permission): diff --git a/dojo/reports/views.py b/dojo/reports/views.py index 1dc29ff5e2d..ab4bee9bbd1 100644 --- a/dojo/reports/views.py +++ b/dojo/reports/views.py @@ -789,8 +789,6 @@ def generate_quick_report(self, request, findings, obj=None): def get_excludes(): return ["SEVERITIES", "age", "github_issue", "jira_issue", "objects", "risk_acceptance", - "test__engagement__product__authorized_group", "test__engagement__product__member", - "test__engagement__product__prod_type__authorized_group", "test__engagement__product__prod_type__member", "unsaved_endpoints", "unsaved_vulnerability_ids", "unsaved_files", "unsaved_request", "unsaved_response", "unsaved_tags", "vulnerability_ids", "cve"] diff --git a/dojo/risk_acceptance/queries.py b/dojo/risk_acceptance/queries.py index 72282af21e7..a608a4d174b 100644 --- a/dojo/risk_acceptance/queries.py +++ b/dojo/risk_acceptance/queries.py @@ -1,10 +1,13 @@ from crum import get_current_user -from django.db.models import Exists, OuterRef, Q +from django.db.models import Q, Subquery from dojo.authorization.authorization import get_roles_for_permission, user_has_global_permission from dojo.models import Product_Group, Product_Member, Product_Type_Group, Product_Type_Member, Risk_Acceptance +from dojo.request_cache import cache_for_request +# Cached: all parameters are hashable, no dynamic queryset filtering +@cache_for_request def get_authorized_risk_acceptances(permission): user = get_current_user() @@ -18,27 +21,28 @@ def get_authorized_risk_acceptances(permission): return Risk_Acceptance.objects.all().order_by("id") roles = get_roles_for_permission(permission) + + # Get authorized product/product_type IDs via subqueries authorized_product_type_roles = Product_Type_Member.objects.filter( - product_type=OuterRef("engagement__product__prod_type_id"), - user=user, - role__in=roles) + user=user, role__in=roles, + ).values("product_type_id") + authorized_product_roles = Product_Member.objects.filter( - product=OuterRef("engagement__product_id"), - user=user, - role__in=roles) + user=user, role__in=roles, + ).values("product_id") + authorized_product_type_groups = Product_Type_Group.objects.filter( - product_type=OuterRef("engagement__product__prod_type_id"), - group__users=user, - role__in=roles) + group__users=user, role__in=roles, + ).values("product_type_id") + authorized_product_groups = Product_Group.objects.filter( - product=OuterRef("engagement__product_id"), - group__users=user, - role__in=roles) - risk_acceptances = Risk_Acceptance.objects.annotate( - product__prod_type__member=Exists(authorized_product_type_roles), - product__member=Exists(authorized_product_roles), - product__prod_type__authorized_group=Exists(authorized_product_type_groups), - product__authorized_group=Exists(authorized_product_groups)).order_by("id") - return risk_acceptances.filter( - Q(product__prod_type__member=True) | Q(product__member=True) - | Q(product__prod_type__authorized_group=True) | Q(product__authorized_group=True)) + group__users=user, role__in=roles, + ).values("product_id") + + # Filter using IN with Subquery - no annotations needed + return Risk_Acceptance.objects.filter( + Q(engagement__product__prod_type_id__in=Subquery(authorized_product_type_roles)) + | Q(engagement__product_id__in=Subquery(authorized_product_roles)) + | Q(engagement__product__prod_type_id__in=Subquery(authorized_product_type_groups)) + | Q(engagement__product_id__in=Subquery(authorized_product_groups)), + ).order_by("id") diff --git a/dojo/test/queries.py b/dojo/test/queries.py index 28a9249d543..89089efbf1d 100644 --- a/dojo/test/queries.py +++ b/dojo/test/queries.py @@ -1,10 +1,13 @@ from crum import get_current_user -from django.db.models import Exists, OuterRef, Q +from django.db.models import Q, Subquery from dojo.authorization.authorization import get_roles_for_permission, user_has_global_permission from dojo.models import Product_Group, Product_Member, Product_Type_Group, Product_Type_Member, Test, Test_Import +from dojo.request_cache import cache_for_request +# Cached: all parameters are hashable, no dynamic queryset filtering +@cache_for_request def get_authorized_tests(permission, product=None): user = get_current_user() @@ -22,37 +25,35 @@ def get_authorized_tests(permission, product=None): return Test.objects.all().order_by("id") roles = get_roles_for_permission(permission) + + # Get authorized product/product_type IDs via subqueries authorized_product_type_roles = Product_Type_Member.objects.filter( - product_type=OuterRef("engagement__product__prod_type_id"), - user=user, - role__in=roles) + user=user, role__in=roles, + ).values("product_type_id") + authorized_product_roles = Product_Member.objects.filter( - product=OuterRef("engagement__product_id"), - user=user, - role__in=roles) + user=user, role__in=roles, + ).values("product_id") authorized_product_type_groups = Product_Type_Group.objects.filter( - product_type=OuterRef("engagement__product__prod_type_id"), - group__users=user, - role__in=roles) - authorized_product_groups = Product_Group.objects.filter( - product=OuterRef("engagement__product_id"), - group__users=user, - role__in=roles) + group__users=user, role__in=roles, + ).values("product_type_id") - tests = tests.annotate( - engagement__product__prod_type__member=Exists(authorized_product_type_roles), - engagement__product__member=Exists(authorized_product_roles), - engagement__product__prod_type__authorized_group=Exists(authorized_product_type_groups), - engagement__product__authorized_group=Exists(authorized_product_groups)) + authorized_product_groups = Product_Group.objects.filter( + group__users=user, role__in=roles, + ).values("product_id") + # Filter using IN with Subquery - no annotations needed return tests.filter( - Q(engagement__product__prod_type__member=True) - | Q(engagement__product__member=True) - | Q(engagement__product__prod_type__authorized_group=True) - | Q(engagement__product__authorized_group=True)) + Q(engagement__product__prod_type_id__in=Subquery(authorized_product_type_roles)) + | Q(engagement__product_id__in=Subquery(authorized_product_roles)) + | Q(engagement__product__prod_type_id__in=Subquery(authorized_product_type_groups)) + | Q(engagement__product_id__in=Subquery(authorized_product_groups)), + ) +# Cached: all parameters are hashable, no dynamic queryset filtering +@cache_for_request def get_authorized_test_imports(permission): user = get_current_user() @@ -66,29 +67,28 @@ def get_authorized_test_imports(permission): return Test_Import.objects.all().order_by("id") roles = get_roles_for_permission(permission) + + # Get authorized product/product_type IDs via subqueries authorized_product_type_roles = Product_Type_Member.objects.filter( - product_type=OuterRef("test__engagement__product__prod_type_id"), - user=user, - role__in=roles) + user=user, role__in=roles, + ).values("product_type_id") + authorized_product_roles = Product_Member.objects.filter( - product=OuterRef("test__engagement__product_id"), - user=user, - role__in=roles) + user=user, role__in=roles, + ).values("product_id") + authorized_product_type_groups = Product_Type_Group.objects.filter( - product_type=OuterRef("test__engagement__product__prod_type_id"), - group__users=user, - role__in=roles) + group__users=user, role__in=roles, + ).values("product_type_id") + authorized_product_groups = Product_Group.objects.filter( - product=OuterRef("test__engagement__product_id"), - group__users=user, - role__in=roles) - test_imports = Test_Import.objects.annotate( - test__engagement__product__prod_type__member=Exists(authorized_product_type_roles), - test__engagement__product__member=Exists(authorized_product_roles), - test__engagement__product__prod_type__authorized_group=Exists(authorized_product_type_groups), - test__engagement__product__authorized_group=Exists(authorized_product_groups)).order_by("id") - return test_imports.filter( - Q(test__engagement__product__prod_type__member=True) - | Q(test__engagement__product__member=True) - | Q(test__engagement__product__prod_type__authorized_group=True) - | Q(test__engagement__product__authorized_group=True)) + group__users=user, role__in=roles, + ).values("product_id") + + # Filter using IN with Subquery - no annotations needed + return Test_Import.objects.filter( + Q(test__engagement__product__prod_type_id__in=Subquery(authorized_product_type_roles)) + | Q(test__engagement__product_id__in=Subquery(authorized_product_roles)) + | Q(test__engagement__product__prod_type_id__in=Subquery(authorized_product_type_groups)) + | Q(test__engagement__product_id__in=Subquery(authorized_product_groups)), + ).order_by("id") diff --git a/dojo/tool_product/queries.py b/dojo/tool_product/queries.py index df95594688b..d8979fa5066 100644 --- a/dojo/tool_product/queries.py +++ b/dojo/tool_product/queries.py @@ -1,10 +1,13 @@ from crum import get_current_user -from django.db.models import Exists, OuterRef, Q +from django.db.models import Q, Subquery from dojo.authorization.authorization import get_roles_for_permission, user_has_global_permission from dojo.models import Product_Group, Product_Member, Product_Type_Group, Product_Type_Member, Tool_Product_Settings +from dojo.request_cache import cache_for_request +# Cached: all parameters are hashable, no dynamic queryset filtering +@cache_for_request def get_authorized_tool_product_settings(permission): user = get_current_user() @@ -18,27 +21,28 @@ def get_authorized_tool_product_settings(permission): return Tool_Product_Settings.objects.all().order_by("id") roles = get_roles_for_permission(permission) + + # Get authorized product/product_type IDs via subqueries authorized_product_type_roles = Product_Type_Member.objects.filter( - product_type=OuterRef("product__prod_type_id"), - user=user, - role__in=roles) + user=user, role__in=roles, + ).values("product_type_id") + authorized_product_roles = Product_Member.objects.filter( - product=OuterRef("product_id"), - user=user, - role__in=roles) + user=user, role__in=roles, + ).values("product_id") + authorized_product_type_groups = Product_Type_Group.objects.filter( - product_type=OuterRef("product__prod_type_id"), - group__users=user, - role__in=roles) + group__users=user, role__in=roles, + ).values("product_type_id") + authorized_product_groups = Product_Group.objects.filter( - product=OuterRef("product_id"), - group__users=user, - role__in=roles) - tool_product_settings = Tool_Product_Settings.objects.annotate( - product__prod_type__member=Exists(authorized_product_type_roles), - product__member=Exists(authorized_product_roles), - product__prod_type__authorized_group=Exists(authorized_product_type_groups), - product__authorized_group=Exists(authorized_product_groups)).order_by("id") - return tool_product_settings.filter( - Q(product__prod_type__member=True) | Q(product__member=True) - | Q(product__prod_type__authorized_group=True) | Q(product__authorized_group=True)) + group__users=user, role__in=roles, + ).values("product_id") + + # Filter using IN with Subquery - no annotations needed + return Tool_Product_Settings.objects.filter( + Q(product__prod_type_id__in=Subquery(authorized_product_type_roles)) + | Q(product_id__in=Subquery(authorized_product_roles)) + | Q(product__prod_type_id__in=Subquery(authorized_product_type_groups)) + | Q(product_id__in=Subquery(authorized_product_groups)), + ).order_by("id") diff --git a/dojo/user/queries.py b/dojo/user/queries.py index 5b9227e51bb..85f04b281e4 100644 --- a/dojo/user/queries.py +++ b/dojo/user/queries.py @@ -1,5 +1,5 @@ from crum import get_current_user -from django.db.models import Q +from django.db.models import Q, Subquery from dojo.authorization.authorization import get_roles_for_permission, user_has_global_permission from dojo.models import ( @@ -18,21 +18,33 @@ def get_authorized_users_for_product_type(users, product_type, permission): roles = get_roles_for_permission(permission) - product_type_members = Product_Type_Member.objects \ - .filter(product_type=product_type, role__in=roles) \ - .select_related("user") - product_type_groups = Product_Type_Group.objects \ - .filter(product_type=product_type, role__in=roles) - global_roles = Global_Role.objects.filter(role__in=roles) - group_members = Dojo_Group_Member.objects \ - .filter(Q(group__in=[ptg.group for ptg in product_type_groups]) - | Q(group__in=[gr.group for gr in global_roles])) \ - .select_related("user") - - return users.filter(Q(id__in=[ptm.user.id for ptm in product_type_members]) - | Q(id__in=[gm.user.id for gm in group_members]) + + # Get user IDs via subqueries instead of materializing into Python lists + product_type_member_users = Product_Type_Member.objects.filter( + product_type=product_type, role__in=roles, + ).values("user_id") + + # Get group IDs that have access to this product type + product_type_group_ids = Product_Type_Group.objects.filter( + product_type=product_type, role__in=roles, + ).values("group_id") + + global_role_group_ids = Global_Role.objects.filter( + role__in=roles, group__isnull=False, + ).values("group_id") + + # Get users from those groups + group_member_users = Dojo_Group_Member.objects.filter( + Q(group_id__in=Subquery(product_type_group_ids)) + | Q(group_id__in=Subquery(global_role_group_ids)), + ).values("user_id") + + return users.filter( + Q(id__in=Subquery(product_type_member_users)) + | Q(id__in=Subquery(group_member_users)) | Q(global_role__role__in=roles) - | Q(is_superuser=True)) + | Q(is_superuser=True), + ) def get_authorized_users_for_product_and_product_type(users, product, permission): @@ -41,29 +53,42 @@ def get_authorized_users_for_product_and_product_type(users, product, permission roles = get_roles_for_permission(permission) - product_members = Product_Member.objects \ - .filter(product=product, role__in=roles) \ - .select_related("user") - product_type_members = Product_Type_Member.objects \ - .filter(product_type=product.prod_type, role__in=roles) \ - .select_related("user") - product_groups = Product_Group.objects \ - .filter(product=product, role__in=roles) - product_type_groups = Product_Type_Group.objects \ - .filter(product_type=product.prod_type, role__in=roles) - global_roles = Global_Role.objects.filter(role__in=roles) - group_members = Dojo_Group_Member.objects \ - .filter( - Q(group__in=[pg.group for pg in product_groups]) - | Q(group__in=[ptg.group for ptg in product_type_groups]) - | Q(group__in=[gr.group for gr in global_roles])) \ - .select_related("user") - - return users.filter(Q(id__in=[pm.user.id for pm in product_members]) - | Q(id__in=[ptm.user.id for ptm in product_type_members]) - | Q(id__in=[gm.user.id for gm in group_members]) + # Get user IDs via subqueries instead of materializing into Python lists + product_member_users = Product_Member.objects.filter( + product=product, role__in=roles, + ).values("user_id") + + product_type_member_users = Product_Type_Member.objects.filter( + product_type=product.prod_type, role__in=roles, + ).values("user_id") + + # Get group IDs that have access to this product or product type + product_group_ids = Product_Group.objects.filter( + product=product, role__in=roles, + ).values("group_id") + + product_type_group_ids = Product_Type_Group.objects.filter( + product_type=product.prod_type, role__in=roles, + ).values("group_id") + + global_role_group_ids = Global_Role.objects.filter( + role__in=roles, group__isnull=False, + ).values("group_id") + + # Get users from those groups + group_member_users = Dojo_Group_Member.objects.filter( + Q(group_id__in=Subquery(product_group_ids)) + | Q(group_id__in=Subquery(product_type_group_ids)) + | Q(group_id__in=Subquery(global_role_group_ids)), + ).values("user_id") + + return users.filter( + Q(id__in=Subquery(product_member_users)) + | Q(id__in=Subquery(product_type_member_users)) + | Q(id__in=Subquery(group_member_users)) | Q(global_role__role__in=roles) - | Q(is_superuser=True)) + | Q(is_superuser=True), + ) # Cached because it is a complex SQL query and it is called 3 times for the engagement lists in products @@ -87,23 +112,35 @@ def get_authorized_users(permission, user=None): authorized_product_types = get_authorized_product_types(permission).values("id") roles = get_roles_for_permission(permission) - product_members = Product_Member.objects \ - .filter(product_id__in=authorized_products, role__in=roles) \ - .select_related("user") - product_type_members = Product_Type_Member.objects \ - .filter(product_type_id__in=authorized_product_types, role__in=roles) \ - .select_related("user") - product_groups = Product_Group.objects \ - .filter(product_id__in=authorized_products, role__in=roles) - product_type_groups = Product_Type_Group.objects \ - .filter(product_type_id__in=authorized_product_types, role__in=roles) - group_members = Dojo_Group_Member.objects \ - .filter( - Q(group__in=[pg.group for pg in product_groups]) - | Q(group__in=[ptg.group for ptg in product_type_groups])) \ - .select_related("user") - return users.filter(Q(id__in=[pm.user.id for pm in product_members]) - | Q(id__in=[ptm.user.id for ptm in product_type_members]) - | Q(id__in=[gm.user.id for gm in group_members]) + + # Get user IDs via subqueries instead of materializing into Python lists + product_member_users = Product_Member.objects.filter( + product_id__in=Subquery(authorized_products), role__in=roles, + ).values("user_id") + + product_type_member_users = Product_Type_Member.objects.filter( + product_type_id__in=Subquery(authorized_product_types), role__in=roles, + ).values("user_id") + + # Get group IDs that have access to authorized products/product types + product_group_ids = Product_Group.objects.filter( + product_id__in=Subquery(authorized_products), role__in=roles, + ).values("group_id") + + product_type_group_ids = Product_Type_Group.objects.filter( + product_type_id__in=Subquery(authorized_product_types), role__in=roles, + ).values("group_id") + + # Get users from those groups + group_member_users = Dojo_Group_Member.objects.filter( + Q(group_id__in=Subquery(product_group_ids)) + | Q(group_id__in=Subquery(product_type_group_ids)), + ).values("user_id") + + return users.filter( + Q(id__in=Subquery(product_member_users)) + | Q(id__in=Subquery(product_type_member_users)) + | Q(id__in=Subquery(group_member_users)) | Q(global_role__role__in=roles) - | Q(is_superuser=True)) + | Q(is_superuser=True), + ) diff --git a/unittests/test_authorization_queries.py b/unittests/test_authorization_queries.py new file mode 100644 index 00000000000..a1fc690791b --- /dev/null +++ b/unittests/test_authorization_queries.py @@ -0,0 +1,657 @@ +""" +Unit tests for get_authorized_*() query functions. + +Tests the query functions that filter querysets based on user permissions. +These tests verify that the authorization queries return correct results +for various user permission scenarios. +""" +from unittest.mock import patch + +from django.utils import timezone + +from dojo.authorization.roles_permissions import Permissions +from dojo.endpoint.queries import get_authorized_endpoint_status, get_authorized_endpoints +from dojo.engagement.queries import get_authorized_engagements +from dojo.finding.queries import ( + get_authorized_findings, + get_authorized_findings_for_queryset, + get_authorized_stub_findings, + get_authorized_vulnerability_ids, +) +from dojo.finding_group.queries import get_authorized_finding_groups +from dojo.group.queries import get_authorized_groups +from dojo.models import ( + Dojo_Group, + Dojo_Group_Member, + Dojo_User, + Endpoint, + Endpoint_Status, + Engagement, + Finding, + Finding_Group, + Global_Role, + Product, + Product_Group, + Product_Member, + Product_Type, + Product_Type_Group, + Product_Type_Member, + Role, + Stub_Finding, + Test, + Test_Type, + Vulnerability_Id, +) +from dojo.product.queries import get_authorized_products +from dojo.product_type.queries import get_authorized_product_types +from dojo.test.queries import get_authorized_tests + +from .dojo_test_case import DojoTestCase + + +class AuthorizationQueriesTestBase(DojoTestCase): + + """Base class with common test data setup for authorization query tests.""" + + @classmethod + def setUpTestData(cls): + # Create roles reference + cls.reader_role = Role.objects.get(name="Reader") + cls.writer_role = Role.objects.get(name="Writer") + cls.owner_role = Role.objects.get(name="Owner") + + # Get or create test users - use get_or_create to avoid duplicates + cls.superuser, _ = Dojo_User.objects.get_or_create( + username="auth_test_superuser", + defaults={"is_superuser": True, "is_active": True}, + ) + cls.superuser.is_superuser = True + cls.superuser.save() + + cls.user_no_perms, _ = Dojo_User.objects.get_or_create( + username="auth_test_no_perms", + defaults={"is_active": True}, + ) + cls.user_global_reader, _ = Dojo_User.objects.get_or_create( + username="auth_test_global_reader", + defaults={"is_active": True}, + ) + cls.user_product_member, _ = Dojo_User.objects.get_or_create( + username="auth_test_product_member", + defaults={"is_active": True}, + ) + cls.user_product_type_member, _ = Dojo_User.objects.get_or_create( + username="auth_test_product_type_member", + defaults={"is_active": True}, + ) + cls.user_group_product_member, _ = Dojo_User.objects.get_or_create( + username="auth_test_group_product_member", + defaults={"is_active": True}, + ) + cls.user_group_product_type_member, _ = Dojo_User.objects.get_or_create( + username="auth_test_group_product_type_member", + defaults={"is_active": True}, + ) + + # Create global role for global reader (get_or_create to avoid duplicates) + Global_Role.objects.get_or_create( + user=cls.user_global_reader, + defaults={"role": cls.reader_role}, + ) + + # Create product types + cls.product_type_1, _ = Product_Type.objects.get_or_create(name="Auth Test PT 1") + cls.product_type_2, _ = Product_Type.objects.get_or_create(name="Auth Test PT 2") + + # Create products + cls.product_1, _ = Product.objects.get_or_create( + name="Auth Test Product 1", + defaults={"prod_type": cls.product_type_1}, + ) + cls.product_2, _ = Product.objects.get_or_create( + name="Auth Test Product 2", + defaults={"prod_type": cls.product_type_2}, + ) + + # Create product membership for user_product_member (only to product_1) + Product_Member.objects.get_or_create( + user=cls.user_product_member, + product=cls.product_1, + defaults={"role": cls.reader_role}, + ) + + # Create product type membership for user_product_type_member (only to product_type_1) + Product_Type_Member.objects.get_or_create( + user=cls.user_product_type_member, + product_type=cls.product_type_1, + defaults={"role": cls.reader_role}, + ) + + # Create groups for group-based access + cls.group_product, _ = Dojo_Group.objects.get_or_create(name="Auth Test Group Product") + cls.group_product_type, _ = Dojo_Group.objects.get_or_create(name="Auth Test Group Product Type") + + # Add users to groups + Dojo_Group_Member.objects.get_or_create( + user=cls.user_group_product_member, + group=cls.group_product, + defaults={"role": cls.reader_role}, + ) + Dojo_Group_Member.objects.get_or_create( + user=cls.user_group_product_type_member, + group=cls.group_product_type, + defaults={"role": cls.reader_role}, + ) + + # Create product group membership (group_product -> product_1) + Product_Group.objects.get_or_create( + product=cls.product_1, + group=cls.group_product, + defaults={"role": cls.reader_role}, + ) + + # Create product type group membership (group_product_type -> product_type_1) + Product_Type_Group.objects.get_or_create( + product_type=cls.product_type_1, + group=cls.group_product_type, + defaults={"role": cls.reader_role}, + ) + + # Create test type + cls.test_type, _ = Test_Type.objects.get_or_create(name="Auth Test Type") + + # Create engagements + cls.engagement_1, _ = Engagement.objects.get_or_create( + name="Auth Test Engagement 1", + product=cls.product_1, + defaults={ + "target_start": timezone.now(), + "target_end": timezone.now(), + }, + ) + cls.engagement_2, _ = Engagement.objects.get_or_create( + name="Auth Test Engagement 2", + product=cls.product_2, + defaults={ + "target_start": timezone.now(), + "target_end": timezone.now(), + }, + ) + + # Create tests + cls.test_1, _ = Test.objects.get_or_create( + engagement=cls.engagement_1, + test_type=cls.test_type, + defaults={ + "target_start": timezone.now(), + "target_end": timezone.now(), + }, + ) + cls.test_2, _ = Test.objects.get_or_create( + engagement=cls.engagement_2, + test_type=cls.test_type, + defaults={ + "target_start": timezone.now(), + "target_end": timezone.now(), + }, + ) + + # Create findings - reporter is required + cls.finding_1, _ = Finding.objects.get_or_create( + test=cls.test_1, + title="Auth Test Finding 1", + defaults={ + "severity": "High", + "active": True, + "verified": True, + "numerical_severity": "S1", + "reporter": cls.superuser, + }, + ) + cls.finding_2, _ = Finding.objects.get_or_create( + test=cls.test_2, + title="Auth Test Finding 2", + defaults={ + "severity": "Medium", + "active": True, + "verified": True, + "numerical_severity": "S2", + "reporter": cls.superuser, + }, + ) + + # Create stub findings - reporter is required + cls.stub_finding_1, _ = Stub_Finding.objects.get_or_create( + test=cls.test_1, + title="Auth Test Stub Finding 1", + defaults={ + "severity": "High", + "reporter": cls.superuser, + }, + ) + cls.stub_finding_2, _ = Stub_Finding.objects.get_or_create( + test=cls.test_2, + title="Auth Test Stub Finding 2", + defaults={ + "severity": "Medium", + "reporter": cls.superuser, + }, + ) + + # Create vulnerability IDs + cls.vuln_id_1, _ = Vulnerability_Id.objects.get_or_create( + finding=cls.finding_1, + vulnerability_id="CVE-2024-0001", + ) + cls.vuln_id_2, _ = Vulnerability_Id.objects.get_or_create( + finding=cls.finding_2, + vulnerability_id="CVE-2024-0002", + ) + + # Create endpoints + cls.endpoint_1, _ = Endpoint.objects.get_or_create( + product=cls.product_1, + host="auth-test-1.example.com", + ) + cls.endpoint_2, _ = Endpoint.objects.get_or_create( + product=cls.product_2, + host="auth-test-2.example.com", + ) + + # Create endpoint statuses + cls.endpoint_status_1, _ = Endpoint_Status.objects.get_or_create( + endpoint=cls.endpoint_1, + finding=cls.finding_1, + ) + cls.endpoint_status_2, _ = Endpoint_Status.objects.get_or_create( + endpoint=cls.endpoint_2, + finding=cls.finding_2, + ) + + +class TestGetAuthorizedFindings(AuthorizationQueriesTestBase): + + """Tests for get_authorized_findings()""" + + def test_superuser_gets_all_findings(self): + """Superuser should get all findings""" + findings = get_authorized_findings(Permissions.Finding_View, user=self.superuser) + self.assertIn(self.finding_1, findings) + self.assertIn(self.finding_2, findings) + + def test_user_no_permissions_gets_empty(self): + """User with no permissions should not get test findings""" + findings = get_authorized_findings(Permissions.Finding_View, user=self.user_no_perms) + self.assertNotIn(self.finding_1, findings) + self.assertNotIn(self.finding_2, findings) + + def test_user_global_reader_gets_all(self): + """User with global reader role should get all findings""" + findings = get_authorized_findings(Permissions.Finding_View, user=self.user_global_reader) + self.assertIn(self.finding_1, findings) + self.assertIn(self.finding_2, findings) + + def test_user_product_member_gets_product_findings(self): + """User with product membership should get only that product's findings""" + findings = get_authorized_findings(Permissions.Finding_View, user=self.user_product_member) + self.assertIn(self.finding_1, findings) + self.assertNotIn(self.finding_2, findings) + + def test_user_product_type_member_gets_product_type_findings(self): + """User with product type membership should get all findings in that product type""" + findings = get_authorized_findings(Permissions.Finding_View, user=self.user_product_type_member) + self.assertIn(self.finding_1, findings) + self.assertNotIn(self.finding_2, findings) + + def test_user_group_product_member_gets_group_findings(self): + """User in group with product access should get those findings""" + findings = get_authorized_findings(Permissions.Finding_View, user=self.user_group_product_member) + self.assertIn(self.finding_1, findings) + self.assertNotIn(self.finding_2, findings) + + def test_user_group_product_type_member_gets_group_findings(self): + """User in group with product type access should get those findings""" + findings = get_authorized_findings(Permissions.Finding_View, user=self.user_group_product_type_member) + self.assertIn(self.finding_1, findings) + self.assertNotIn(self.finding_2, findings) + + def test_queryset_parameter_filters_correctly(self): + """Passing a queryset should filter within that queryset""" + base_queryset = Finding.objects.filter(severity="High") + findings = get_authorized_findings_for_queryset(Permissions.Finding_View, base_queryset, user=self.superuser) + self.assertIn(self.finding_1, findings) + self.assertNotIn(self.finding_2, findings) + + def test_none_user_returns_empty(self): + """None user should return empty queryset""" + with patch("dojo.finding.queries.get_current_user", return_value=None): + findings = get_authorized_findings(Permissions.Finding_View) + self.assertEqual(findings.count(), 0) + + +class TestGetAuthorizedStubFindings(AuthorizationQueriesTestBase): + + """Tests for get_authorized_stub_findings() - uses get_current_user()""" + + @patch("dojo.finding.queries.get_current_user") + def test_superuser_gets_all_stub_findings(self, mock_get_current_user): + """Superuser should get all stub findings""" + mock_get_current_user.return_value = self.superuser + stub_findings = get_authorized_stub_findings(Permissions.Finding_View) + self.assertIn(self.stub_finding_1, stub_findings) + self.assertIn(self.stub_finding_2, stub_findings) + + @patch("dojo.finding.queries.get_current_user") + def test_user_no_permissions_gets_empty(self, mock_get_current_user): + """User with no permissions should not get test stub findings""" + mock_get_current_user.return_value = self.user_no_perms + stub_findings = get_authorized_stub_findings(Permissions.Finding_View) + self.assertNotIn(self.stub_finding_1, stub_findings) + self.assertNotIn(self.stub_finding_2, stub_findings) + + @patch("dojo.finding.queries.get_current_user") + def test_user_product_member_gets_product_stub_findings(self, mock_get_current_user): + """User with product membership should get only that product's stub findings""" + mock_get_current_user.return_value = self.user_product_member + stub_findings = get_authorized_stub_findings(Permissions.Finding_View) + self.assertIn(self.stub_finding_1, stub_findings) + self.assertNotIn(self.stub_finding_2, stub_findings) + + +class TestGetAuthorizedVulnerabilityIds(AuthorizationQueriesTestBase): + + """Tests for get_authorized_vulnerability_ids()""" + + def test_superuser_gets_all_vulnerability_ids(self): + """Superuser should get all vulnerability IDs""" + vuln_ids = get_authorized_vulnerability_ids(Permissions.Finding_View, user=self.superuser) + self.assertIn(self.vuln_id_1, vuln_ids) + self.assertIn(self.vuln_id_2, vuln_ids) + + def test_user_no_permissions_gets_empty(self): + """User with no permissions should not get test vulnerability IDs""" + vuln_ids = get_authorized_vulnerability_ids(Permissions.Finding_View, user=self.user_no_perms) + self.assertNotIn(self.vuln_id_1, vuln_ids) + self.assertNotIn(self.vuln_id_2, vuln_ids) + + def test_user_product_member_gets_product_vulnerability_ids(self): + """User with product membership should get only that product's vulnerability IDs""" + vuln_ids = get_authorized_vulnerability_ids(Permissions.Finding_View, user=self.user_product_member) + self.assertIn(self.vuln_id_1, vuln_ids) + self.assertNotIn(self.vuln_id_2, vuln_ids) + + +class TestGetAuthorizedProducts(AuthorizationQueriesTestBase): + + """Tests for get_authorized_products()""" + + def test_superuser_gets_all_products(self): + """Superuser should get all products""" + products = get_authorized_products(Permissions.Product_View, user=self.superuser) + self.assertIn(self.product_1, products) + self.assertIn(self.product_2, products) + + def test_user_no_permissions_gets_empty(self): + """User with no permissions should not get test products""" + products = get_authorized_products(Permissions.Product_View, user=self.user_no_perms) + self.assertNotIn(self.product_1, products) + self.assertNotIn(self.product_2, products) + + def test_user_global_reader_gets_all(self): + """User with global reader role should get all products""" + products = get_authorized_products(Permissions.Product_View, user=self.user_global_reader) + self.assertIn(self.product_1, products) + self.assertIn(self.product_2, products) + + def test_user_product_member_gets_own_products(self): + """User with product membership should get only that product""" + products = get_authorized_products(Permissions.Product_View, user=self.user_product_member) + self.assertIn(self.product_1, products) + self.assertNotIn(self.product_2, products) + + def test_user_product_type_member_gets_type_products(self): + """User with product type membership should get products in that type""" + products = get_authorized_products(Permissions.Product_View, user=self.user_product_type_member) + self.assertIn(self.product_1, products) + self.assertNotIn(self.product_2, products) + + def test_user_group_product_member_gets_group_products(self): + """User in group with product access should get those products""" + products = get_authorized_products(Permissions.Product_View, user=self.user_group_product_member) + self.assertIn(self.product_1, products) + self.assertNotIn(self.product_2, products) + + def test_user_group_product_type_member_gets_group_products(self): + """User in group with product type access should get products in that type""" + products = get_authorized_products(Permissions.Product_View, user=self.user_group_product_type_member) + self.assertIn(self.product_1, products) + self.assertNotIn(self.product_2, products) + + +class TestGetAuthorizedProductTypes(AuthorizationQueriesTestBase): + + """Tests for get_authorized_product_types() - uses get_current_user()""" + + @patch("dojo.product_type.queries.get_current_user") + def test_superuser_gets_all_product_types(self, mock_get_current_user): + """Superuser should get all product types""" + mock_get_current_user.return_value = self.superuser + product_types = get_authorized_product_types(Permissions.Product_Type_View) + self.assertIn(self.product_type_1, product_types) + self.assertIn(self.product_type_2, product_types) + + @patch("dojo.product_type.queries.get_current_user") + def test_user_no_permissions_gets_empty(self, mock_get_current_user): + """User with no permissions should not get test product types""" + mock_get_current_user.return_value = self.user_no_perms + product_types = get_authorized_product_types(Permissions.Product_Type_View) + self.assertNotIn(self.product_type_1, product_types) + self.assertNotIn(self.product_type_2, product_types) + + @patch("dojo.product_type.queries.get_current_user") + def test_user_global_reader_gets_all(self, mock_get_current_user): + """User with global reader role should get all product types""" + mock_get_current_user.return_value = self.user_global_reader + product_types = get_authorized_product_types(Permissions.Product_Type_View) + self.assertIn(self.product_type_1, product_types) + self.assertIn(self.product_type_2, product_types) + + @patch("dojo.product_type.queries.get_current_user") + def test_user_product_type_member_gets_own_types(self, mock_get_current_user): + """User with product type membership should get only that type""" + mock_get_current_user.return_value = self.user_product_type_member + product_types = get_authorized_product_types(Permissions.Product_Type_View) + self.assertIn(self.product_type_1, product_types) + self.assertNotIn(self.product_type_2, product_types) + + @patch("dojo.product_type.queries.get_current_user") + def test_user_group_product_type_member_gets_group_types(self, mock_get_current_user): + """User in group with product type access should get that type""" + mock_get_current_user.return_value = self.user_group_product_type_member + product_types = get_authorized_product_types(Permissions.Product_Type_View) + self.assertIn(self.product_type_1, product_types) + self.assertNotIn(self.product_type_2, product_types) + + +class TestGetAuthorizedEngagements(AuthorizationQueriesTestBase): + + """Tests for get_authorized_engagements() - uses get_current_user()""" + + @patch("dojo.engagement.queries.get_current_user") + def test_superuser_gets_all_engagements(self, mock_get_current_user): + """Superuser should get all engagements""" + mock_get_current_user.return_value = self.superuser + engagements = get_authorized_engagements(Permissions.Engagement_View) + self.assertIn(self.engagement_1, engagements) + self.assertIn(self.engagement_2, engagements) + + @patch("dojo.engagement.queries.get_current_user") + def test_user_no_permissions_gets_empty(self, mock_get_current_user): + """User with no permissions should not get test engagements""" + mock_get_current_user.return_value = self.user_no_perms + engagements = get_authorized_engagements(Permissions.Engagement_View) + self.assertNotIn(self.engagement_1, engagements) + self.assertNotIn(self.engagement_2, engagements) + + @patch("dojo.engagement.queries.get_current_user") + def test_user_global_reader_gets_all(self, mock_get_current_user): + """User with global reader role should get all engagements""" + mock_get_current_user.return_value = self.user_global_reader + engagements = get_authorized_engagements(Permissions.Engagement_View) + self.assertIn(self.engagement_1, engagements) + self.assertIn(self.engagement_2, engagements) + + @patch("dojo.engagement.queries.get_current_user") + def test_user_product_member_gets_product_engagements(self, mock_get_current_user): + """User with product membership should get only that product's engagements""" + mock_get_current_user.return_value = self.user_product_member + engagements = get_authorized_engagements(Permissions.Engagement_View) + self.assertIn(self.engagement_1, engagements) + self.assertNotIn(self.engagement_2, engagements) + + @patch("dojo.engagement.queries.get_current_user") + def test_user_product_type_member_gets_product_type_engagements(self, mock_get_current_user): + """User with product type membership should get engagements in that type""" + mock_get_current_user.return_value = self.user_product_type_member + engagements = get_authorized_engagements(Permissions.Engagement_View) + self.assertIn(self.engagement_1, engagements) + self.assertNotIn(self.engagement_2, engagements) + + +class TestGetAuthorizedTests(AuthorizationQueriesTestBase): + + """Tests for get_authorized_tests() - uses get_current_user()""" + + @patch("dojo.test.queries.get_current_user") + def test_superuser_gets_all_tests(self, mock_get_current_user): + """Superuser should get all tests""" + mock_get_current_user.return_value = self.superuser + tests = get_authorized_tests(Permissions.Test_View) + self.assertIn(self.test_1, tests) + self.assertIn(self.test_2, tests) + + @patch("dojo.test.queries.get_current_user") + def test_user_no_permissions_gets_empty(self, mock_get_current_user): + """User with no permissions should not get test tests""" + mock_get_current_user.return_value = self.user_no_perms + tests = get_authorized_tests(Permissions.Test_View) + self.assertNotIn(self.test_1, tests) + self.assertNotIn(self.test_2, tests) + + @patch("dojo.test.queries.get_current_user") + def test_user_product_member_gets_product_tests(self, mock_get_current_user): + """User with product membership should get only that product's tests""" + mock_get_current_user.return_value = self.user_product_member + tests = get_authorized_tests(Permissions.Test_View) + self.assertIn(self.test_1, tests) + self.assertNotIn(self.test_2, tests) + + +class TestGetAuthorizedEndpoints(AuthorizationQueriesTestBase): + + """Tests for get_authorized_endpoints()""" + + def test_superuser_gets_all_endpoints(self): + """Superuser should get all endpoints""" + endpoints = get_authorized_endpoints(Permissions.Endpoint_View, user=self.superuser) + self.assertIn(self.endpoint_1, endpoints) + self.assertIn(self.endpoint_2, endpoints) + + def test_user_no_permissions_gets_empty(self): + """User with no permissions should not get test endpoints""" + endpoints = get_authorized_endpoints(Permissions.Endpoint_View, user=self.user_no_perms) + self.assertNotIn(self.endpoint_1, endpoints) + self.assertNotIn(self.endpoint_2, endpoints) + + def test_user_product_member_gets_product_endpoints(self): + """User with product membership should get only that product's endpoints""" + endpoints = get_authorized_endpoints(Permissions.Endpoint_View, user=self.user_product_member) + self.assertIn(self.endpoint_1, endpoints) + self.assertNotIn(self.endpoint_2, endpoints) + + +class TestGetAuthorizedEndpointStatus(AuthorizationQueriesTestBase): + + """Tests for get_authorized_endpoint_status()""" + + def test_superuser_gets_all_endpoint_statuses(self): + """Superuser should get all endpoint statuses""" + endpoint_statuses = get_authorized_endpoint_status(Permissions.Endpoint_View, user=self.superuser) + self.assertIn(self.endpoint_status_1, endpoint_statuses) + self.assertIn(self.endpoint_status_2, endpoint_statuses) + + def test_user_no_permissions_gets_empty(self): + """User with no permissions should not get test endpoint statuses""" + endpoint_statuses = get_authorized_endpoint_status(Permissions.Endpoint_View, user=self.user_no_perms) + self.assertNotIn(self.endpoint_status_1, endpoint_statuses) + self.assertNotIn(self.endpoint_status_2, endpoint_statuses) + + def test_user_product_member_gets_product_endpoint_statuses(self): + """User with product membership should get only that product's endpoint statuses""" + endpoint_statuses = get_authorized_endpoint_status(Permissions.Endpoint_View, user=self.user_product_member) + self.assertIn(self.endpoint_status_1, endpoint_statuses) + self.assertNotIn(self.endpoint_status_2, endpoint_statuses) + + +class TestGetAuthorizedGroups(AuthorizationQueriesTestBase): + + """Tests for get_authorized_groups() - uses get_current_user()""" + + @patch("dojo.group.queries.get_current_user") + def test_superuser_gets_all_groups(self, mock_get_current_user): + """Superuser should get all groups""" + mock_get_current_user.return_value = self.superuser + groups = get_authorized_groups(Permissions.Group_View) + self.assertIn(self.group_product, groups) + self.assertIn(self.group_product_type, groups) + + @patch("dojo.group.queries.get_current_user") + def test_user_group_member_gets_own_groups(self, mock_get_current_user): + """User who is a group member should get that group""" + mock_get_current_user.return_value = self.user_group_product_member + groups = get_authorized_groups(Permissions.Group_View) + self.assertIn(self.group_product, groups) + + +class TestGetAuthorizedFindingGroups(AuthorizationQueriesTestBase): + + """Tests for get_authorized_finding_groups()""" + + @classmethod + def setUpTestData(cls): + super().setUpTestData() + # Create finding groups - creator is required + cls.finding_group_1, _ = Finding_Group.objects.get_or_create( + name="Auth Test Finding Group 1", + test=cls.test_1, + defaults={"creator": cls.superuser}, + ) + cls.finding_group_2, _ = Finding_Group.objects.get_or_create( + name="Auth Test Finding Group 2", + test=cls.test_2, + defaults={"creator": cls.superuser}, + ) + + def test_superuser_gets_all_finding_groups(self): + """Superuser should get all finding groups""" + finding_groups = get_authorized_finding_groups(Permissions.Finding_Group_View, user=self.superuser) + self.assertIn(self.finding_group_1, finding_groups) + self.assertIn(self.finding_group_2, finding_groups) + + def test_user_no_permissions_gets_empty(self): + """User with no permissions should not get test finding groups""" + finding_groups = get_authorized_finding_groups(Permissions.Finding_Group_View, user=self.user_no_perms) + self.assertNotIn(self.finding_group_1, finding_groups) + self.assertNotIn(self.finding_group_2, finding_groups) + + def test_user_product_member_gets_product_finding_groups(self): + """User with product membership should get only that product's finding groups""" + finding_groups = get_authorized_finding_groups(Permissions.Finding_Group_View, user=self.user_product_member) + self.assertIn(self.finding_group_1, finding_groups) + self.assertNotIn(self.finding_group_2, finding_groups) + + +# Note: Tests for get_authorized_risk_acceptances(), get_authorized_jira_projects(), +# and get_authorized_jira_issues() require complex model setups (JIRA_Instance with many +# required fields, Risk_Acceptance with engagement relations). These are covered by +# the existing REST API tests in test_rest_framework.py. diff --git a/unittests/test_importers_performance.py b/unittests/test_importers_performance.py index c9bd839be00..d1f05328faa 100644 --- a/unittests/test_importers_performance.py +++ b/unittests/test_importers_performance.py @@ -265,20 +265,12 @@ def test_import_reimport_reimport_performance_pghistory_async(self): configure_pghistory_triggers() self._import_reimport_performance( - - - - - expected_num_queries1=305, + expected_num_queries1=295, expected_num_async_tasks1=6, - expected_num_queries2=232, + expected_num_queries2=227, expected_num_async_tasks2=17, - expected_num_queries3=114, + expected_num_queries3=109, expected_num_async_tasks3=16, - - - - ) @override_settings(ENABLE_AUDITLOG=True) @@ -295,14 +287,12 @@ def test_import_reimport_reimport_performance_pghistory_no_async(self): testuser.usercontactinfo.save() self._import_reimport_performance( - - expected_num_queries1=312, + expected_num_queries1=302, expected_num_async_tasks1=6, - expected_num_queries2=239, + expected_num_queries2=234, expected_num_async_tasks2=17, - expected_num_queries3=121, + expected_num_queries3=116, expected_num_async_tasks3=16, - ) @override_settings(ENABLE_AUDITLOG=True) @@ -320,11 +310,11 @@ def test_import_reimport_reimport_performance_pghistory_no_async_with_product_gr self.system_settings(enable_product_grade=True) self._import_reimport_performance( - expected_num_queries1=319, + expected_num_queries1=309, expected_num_async_tasks1=8, - expected_num_queries2=246, + expected_num_queries2=241, expected_num_async_tasks2=19, - expected_num_queries3=125, + expected_num_queries3=120, expected_num_async_tasks3=18, ) @@ -443,9 +433,9 @@ def test_deduplication_performance_pghistory_async(self): self.system_settings(enable_deduplication=True) self._deduplication_performance( - expected_num_queries1=274, + expected_num_queries1=264, expected_num_async_tasks1=7, - expected_num_queries2=185, + expected_num_queries2=175, expected_num_async_tasks2=7, check_duplicates=False, # Async mode - deduplication happens later ) @@ -464,11 +454,8 @@ def test_deduplication_performance_pghistory_no_async(self): testuser.usercontactinfo.save() self._deduplication_performance( - - expected_num_queries1=281, + expected_num_queries1=271, expected_num_async_tasks1=7, - expected_num_queries2=246, + expected_num_queries2=236, expected_num_async_tasks2=7, - - ) diff --git a/unittests/test_metrics_queries.py b/unittests/test_metrics_queries.py index 751122c54ac..5569b446466 100644 --- a/unittests/test_metrics_queries.py +++ b/unittests/test_metrics_queries.py @@ -317,12 +317,12 @@ def test_endpoint_queries(self, mock_now): self.assertCountEqual( endpoint_queries["all"].values(), [ - {"id": 1, "date": date(2020, 7, 1), "last_modified": datetime(2020, 7, 1, 17, 45, 39, 791907, tzinfo=zoneinfo.ZoneInfo("UTC")), "mitigated": False, "mitigated_time": None, "mitigated_by_id": None, "false_positive": False, "out_of_scope": False, "risk_accepted": False, "endpoint_id": 2, "finding_id": 2, "endpoint__product__prod_type__member": False, "endpoint__product__member": True, "endpoint__product__prod_type__authorized_group": False, "endpoint__product__authorized_group": False}, - {"id": 3, "date": date(2020, 7, 1), "last_modified": datetime(2020, 7, 1, 17, 45, 39, 791907, tzinfo=zoneinfo.ZoneInfo("UTC")), "mitigated": False, "mitigated_time": None, "mitigated_by_id": None, "false_positive": True, "out_of_scope": False, "risk_accepted": False, "endpoint_id": 5, "finding_id": 228, "endpoint__product__prod_type__member": True, "endpoint__product__member": True, "endpoint__product__prod_type__authorized_group": False, "endpoint__product__authorized_group": False}, - {"id": 4, "date": date(2020, 7, 1), "last_modified": datetime(2020, 7, 1, 17, 45, 39, 791907, tzinfo=zoneinfo.ZoneInfo("UTC")), "mitigated": False, "mitigated_time": None, "mitigated_by_id": None, "false_positive": False, "out_of_scope": True, "risk_accepted": False, "endpoint_id": 5, "finding_id": 229, "endpoint__product__prod_type__member": True, "endpoint__product__member": True, "endpoint__product__prod_type__authorized_group": False, "endpoint__product__authorized_group": False}, - {"id": 5, "date": date(2020, 7, 1), "last_modified": datetime(2020, 7, 1, 17, 45, 39, 791907, tzinfo=zoneinfo.ZoneInfo("UTC")), "mitigated": False, "mitigated_time": None, "mitigated_by_id": None, "false_positive": False, "out_of_scope": False, "risk_accepted": True, "endpoint_id": 5, "finding_id": 230, "endpoint__product__prod_type__member": True, "endpoint__product__member": True, "endpoint__product__prod_type__authorized_group": False, "endpoint__product__authorized_group": False}, - {"id": 7, "date": date(2020, 7, 1), "last_modified": datetime(2020, 7, 1, 17, 45, 39, 791907, tzinfo=zoneinfo.ZoneInfo("UTC")), "mitigated": False, "mitigated_time": None, "mitigated_by_id": None, "false_positive": False, "out_of_scope": False, "risk_accepted": False, "endpoint_id": 7, "finding_id": 227, "endpoint__product__prod_type__member": True, "endpoint__product__member": True, "endpoint__product__prod_type__authorized_group": False, "endpoint__product__authorized_group": False}, - {"id": 8, "date": date(2020, 7, 1), "last_modified": datetime(2020, 7, 1, 17, 45, 39, 791907, tzinfo=zoneinfo.ZoneInfo("UTC")), "mitigated": False, "mitigated_time": None, "mitigated_by_id": None, "false_positive": False, "out_of_scope": False, "risk_accepted": False, "endpoint_id": 8, "finding_id": 231, "endpoint__product__prod_type__member": True, "endpoint__product__member": True, "endpoint__product__prod_type__authorized_group": False, "endpoint__product__authorized_group": False}, + {"id": 1, "date": date(2020, 7, 1), "last_modified": datetime(2020, 7, 1, 17, 45, 39, 791907, tzinfo=zoneinfo.ZoneInfo("UTC")), "mitigated": False, "mitigated_time": None, "mitigated_by_id": None, "false_positive": False, "out_of_scope": False, "risk_accepted": False, "endpoint_id": 2, "finding_id": 2}, + {"id": 3, "date": date(2020, 7, 1), "last_modified": datetime(2020, 7, 1, 17, 45, 39, 791907, tzinfo=zoneinfo.ZoneInfo("UTC")), "mitigated": False, "mitigated_time": None, "mitigated_by_id": None, "false_positive": True, "out_of_scope": False, "risk_accepted": False, "endpoint_id": 5, "finding_id": 228}, + {"id": 4, "date": date(2020, 7, 1), "last_modified": datetime(2020, 7, 1, 17, 45, 39, 791907, tzinfo=zoneinfo.ZoneInfo("UTC")), "mitigated": False, "mitigated_time": None, "mitigated_by_id": None, "false_positive": False, "out_of_scope": True, "risk_accepted": False, "endpoint_id": 5, "finding_id": 229}, + {"id": 5, "date": date(2020, 7, 1), "last_modified": datetime(2020, 7, 1, 17, 45, 39, 791907, tzinfo=zoneinfo.ZoneInfo("UTC")), "mitigated": False, "mitigated_time": None, "mitigated_by_id": None, "false_positive": False, "out_of_scope": False, "risk_accepted": True, "endpoint_id": 5, "finding_id": 230}, + {"id": 7, "date": date(2020, 7, 1), "last_modified": datetime(2020, 7, 1, 17, 45, 39, 791907, tzinfo=zoneinfo.ZoneInfo("UTC")), "mitigated": False, "mitigated_time": None, "mitigated_by_id": None, "false_positive": False, "out_of_scope": False, "risk_accepted": False, "endpoint_id": 7, "finding_id": 227}, + {"id": 8, "date": date(2020, 7, 1), "last_modified": datetime(2020, 7, 1, 17, 45, 39, 791907, tzinfo=zoneinfo.ZoneInfo("UTC")), "mitigated": False, "mitigated_time": None, "mitigated_by_id": None, "false_positive": False, "out_of_scope": False, "risk_accepted": False, "endpoint_id": 8, "finding_id": 231}, ], ) self.assertSequenceEqual( @@ -331,7 +331,7 @@ def test_endpoint_queries(self, mock_now): ) self.assertSequenceEqual( endpoint_queries["accepted"].values(), - [{"id": 5, "date": date(2020, 7, 1), "last_modified": datetime(2020, 7, 1, 17, 45, 39, 791907, tzinfo=zoneinfo.ZoneInfo("UTC")), "mitigated": False, "mitigated_time": None, "mitigated_by_id": None, "false_positive": False, "out_of_scope": False, "risk_accepted": True, "endpoint_id": 5, "finding_id": 230, "endpoint__product__prod_type__member": True, "endpoint__product__member": True, "endpoint__product__prod_type__authorized_group": False, "endpoint__product__authorized_group": False}], + [{"id": 5, "date": date(2020, 7, 1), "last_modified": datetime(2020, 7, 1, 17, 45, 39, 791907, tzinfo=zoneinfo.ZoneInfo("UTC")), "mitigated": False, "mitigated_time": None, "mitigated_by_id": None, "false_positive": False, "out_of_scope": False, "risk_accepted": True, "endpoint_id": 5, "finding_id": 230}], ) self.assertSequenceEqual( list(endpoint_queries["accepted_count"].values()), diff --git a/unittests/test_user_queries.py b/unittests/test_user_queries.py index b4477f0f509..216c2bf1e75 100644 --- a/unittests/test_user_queries.py +++ b/unittests/test_user_queries.py @@ -1,8 +1,24 @@ from unittest.mock import patch from dojo.authorization.roles_permissions import Permissions -from dojo.models import Dojo_User, Global_Role, Product, Product_Member, Product_Type, Product_Type_Member, Role -from dojo.user.queries import get_authorized_users +from dojo.models import ( + Dojo_Group, + Dojo_Group_Member, + Dojo_User, + Global_Role, + Product, + Product_Group, + Product_Member, + Product_Type, + Product_Type_Group, + Product_Type_Member, + Role, +) +from dojo.user.queries import ( + get_authorized_users, + get_authorized_users_for_product_and_product_type, + get_authorized_users_for_product_type, +) from .dojo_test_case import DojoTestCase @@ -83,3 +99,273 @@ def test_user_regular(self, mock_current_user_1, mock_current_user_2): users = Dojo_User.objects.exclude(username="invisible_user").order_by("first_name", "last_name", "username") self.assertQuerySetEqual(users, get_authorized_users(Permissions.Product_View)) + + +class TestGetAuthorizedUsersForProductType(DojoTestCase): + + """Tests for get_authorized_users_for_product_type()""" + + @classmethod + def setUpTestData(cls): + cls.reader_role = Role.objects.get(name="Reader") + cls.writer_role = Role.objects.get(name="Writer") + + # Create users with different permission levels + cls.superuser = Dojo_User.objects.create( + username="uq_pt_superuser", + is_superuser=True, + is_active=True, + ) + cls.user_no_perms = Dojo_User.objects.create( + username="uq_pt_no_perms", + is_active=True, + ) + cls.user_product_type_member = Dojo_User.objects.create( + username="uq_pt_member", + is_active=True, + ) + cls.user_global_reader = Dojo_User.objects.create( + username="uq_pt_global_reader", + is_active=True, + ) + cls.user_group_member = Dojo_User.objects.create( + username="uq_pt_group_member", + is_active=True, + ) + + # Create product type + cls.product_type = Product_Type.objects.create(name="UQ Test PT") + + # Set up memberships + Product_Type_Member.objects.create( + user=cls.user_product_type_member, + product_type=cls.product_type, + role=cls.reader_role, + ) + Global_Role.objects.create( + user=cls.user_global_reader, + role=cls.reader_role, + ) + + # Create group and group membership + cls.group = Dojo_Group.objects.create(name="UQ PT Test Group") + Dojo_Group_Member.objects.create( + user=cls.user_group_member, + group=cls.group, + role=cls.reader_role, + ) + Product_Type_Group.objects.create( + product_type=cls.product_type, + group=cls.group, + role=cls.reader_role, + ) + + def test_superuser_included(self): + """Superusers should always be included""" + users = get_authorized_users_for_product_type( + Dojo_User.objects.all(), + self.product_type, + Permissions.Product_Type_View, + ) + self.assertIn(self.superuser, users) + + def test_global_role_user_included(self): + """Users with global roles should be included""" + users = get_authorized_users_for_product_type( + Dojo_User.objects.all(), + self.product_type, + Permissions.Product_Type_View, + ) + self.assertIn(self.user_global_reader, users) + + def test_product_type_member_included(self): + """Product type members with sufficient role should be included""" + users = get_authorized_users_for_product_type( + Dojo_User.objects.all(), + self.product_type, + Permissions.Product_Type_View, + ) + self.assertIn(self.user_product_type_member, users) + + def test_no_perms_user_excluded(self): + """Users without any relevant permissions should be excluded""" + users = get_authorized_users_for_product_type( + Dojo_User.objects.all(), + self.product_type, + Permissions.Product_Type_View, + ) + self.assertNotIn(self.user_no_perms, users) + + def test_group_member_included(self): + """Users in groups with product type access should be included""" + users = get_authorized_users_for_product_type( + Dojo_User.objects.all(), + self.product_type, + Permissions.Product_Type_View, + ) + self.assertIn(self.user_group_member, users) + + +class TestGetAuthorizedUsersForProductAndProductType(DojoTestCase): + + """Tests for get_authorized_users_for_product_and_product_type()""" + + @classmethod + def setUpTestData(cls): + cls.reader_role = Role.objects.get(name="Reader") + cls.writer_role = Role.objects.get(name="Writer") + + # Create users with different permission levels + cls.superuser = Dojo_User.objects.create( + username="uq_ppt_superuser", + is_superuser=True, + is_active=True, + ) + cls.user_no_perms = Dojo_User.objects.create( + username="uq_ppt_no_perms", + is_active=True, + ) + cls.user_product_member = Dojo_User.objects.create( + username="uq_ppt_prod_member", + is_active=True, + ) + cls.user_product_type_member = Dojo_User.objects.create( + username="uq_ppt_pt_member", + is_active=True, + ) + cls.user_global_reader = Dojo_User.objects.create( + username="uq_ppt_global_reader", + is_active=True, + ) + cls.user_group_product_member = Dojo_User.objects.create( + username="uq_ppt_group_prod_member", + is_active=True, + ) + cls.user_group_product_type_member = Dojo_User.objects.create( + username="uq_ppt_group_pt_member", + is_active=True, + ) + + # Create product type and product + cls.product_type = Product_Type.objects.create(name="UQ PPT Test PT") + cls.product = Product.objects.create( + name="UQ PPT Test Product", + prod_type=cls.product_type, + ) + + # Set up direct memberships + Product_Member.objects.create( + user=cls.user_product_member, + product=cls.product, + role=cls.reader_role, + ) + Product_Type_Member.objects.create( + user=cls.user_product_type_member, + product_type=cls.product_type, + role=cls.reader_role, + ) + Global_Role.objects.create( + user=cls.user_global_reader, + role=cls.reader_role, + ) + + # Create groups and group memberships + cls.group_product = Dojo_Group.objects.create(name="UQ PPT Product Group") + cls.group_product_type = Dojo_Group.objects.create(name="UQ PPT Product Type Group") + + Dojo_Group_Member.objects.create( + user=cls.user_group_product_member, + group=cls.group_product, + role=cls.reader_role, + ) + Dojo_Group_Member.objects.create( + user=cls.user_group_product_type_member, + group=cls.group_product_type, + role=cls.reader_role, + ) + + Product_Group.objects.create( + product=cls.product, + group=cls.group_product, + role=cls.reader_role, + ) + Product_Type_Group.objects.create( + product_type=cls.product_type, + group=cls.group_product_type, + role=cls.reader_role, + ) + + def test_superuser_included(self): + """Superusers should always be included""" + users = get_authorized_users_for_product_and_product_type( + None, + self.product, + Permissions.Product_View, + ) + self.assertIn(self.superuser, users) + + def test_global_role_user_included(self): + """Users with global roles should be included""" + users = get_authorized_users_for_product_and_product_type( + None, + self.product, + Permissions.Product_View, + ) + self.assertIn(self.user_global_reader, users) + + def test_product_member_included(self): + """Product members with sufficient role should be included""" + users = get_authorized_users_for_product_and_product_type( + None, + self.product, + Permissions.Product_View, + ) + self.assertIn(self.user_product_member, users) + + def test_product_type_member_included(self): + """Product type members should be included (inheritance)""" + users = get_authorized_users_for_product_and_product_type( + None, + self.product, + Permissions.Product_View, + ) + self.assertIn(self.user_product_type_member, users) + + def test_no_perms_user_excluded(self): + """Users without any relevant permissions should be excluded""" + users = get_authorized_users_for_product_and_product_type( + None, + self.product, + Permissions.Product_View, + ) + self.assertNotIn(self.user_no_perms, users) + + def test_group_product_member_included(self): + """Users in groups with product access should be included""" + users = get_authorized_users_for_product_and_product_type( + None, + self.product, + Permissions.Product_View, + ) + self.assertIn(self.user_group_product_member, users) + + def test_group_product_type_member_included(self): + """Users in groups with product type access should be included""" + users = get_authorized_users_for_product_and_product_type( + None, + self.product, + Permissions.Product_View, + ) + self.assertIn(self.user_group_product_type_member, users) + + def test_users_parameter_filters_base_queryset(self): + """Passing a users queryset should filter within that queryset""" + active_users = Dojo_User.objects.filter(is_active=True) + users = get_authorized_users_for_product_and_product_type( + active_users, + self.product, + Permissions.Product_View, + ) + # All returned users should be active + for user in users: + self.assertTrue(user.is_active)