From 1cc51a2c67ae0e72dfef67e0db8d600c4051746f Mon Sep 17 00:00:00 2001 From: Valentijn Scholten Date: Wed, 21 Jan 2026 16:16:29 +0100 Subject: [PATCH 1/3] Fix risk acceptance API to link to engagement and add validations Fixes #12644 This commit addresses several issues with the risk acceptance API: 1. Risk acceptances created via API now appear in engagement panel - Added engagement.risk_acceptance.add(instance) in create() method - Fixes the main bug where API-created risk acceptances were orphaned 2. Added validation for enable_full_risk_acceptance product setting - API now respects the product-level setting before creating instances - Validates in validate() method to fail early 3. Added protection against engagement switching - Prevents moving risk acceptances between engagements via PATCH/PUT - Validates even when risk acceptance has no findings (edge case) 4. Performance improvement - Use self.instance.accepted_findings.all() instead of filtering 5. Comprehensive API tests - Added test_risk_acceptance_api.py with 7 test cases - Covers all edge cases and validation scenarios - All tests passing Changes: - dojo/api_v2/serializers.py: Enhanced RiskAcceptanceSerializer - unittests/test_risk_acceptance_api.py: New comprehensive test suite --- dojo/api_v2/serializers.py | 24 +- unittests/test_risk_acceptance_api.py | 360 ++++++++++++++++++++++++++ 2 files changed, 383 insertions(+), 1 deletion(-) create mode 100644 unittests/test_risk_acceptance_api.py diff --git a/dojo/api_v2/serializers.py b/dojo/api_v2/serializers.py index 9a17e4be98..c78c81d4d5 100644 --- a/dojo/api_v2/serializers.py +++ b/dojo/api_v2/serializers.py @@ -1535,6 +1535,12 @@ def create(self, validated_data): instance = super().create(validated_data) user = getattr(self.context.get("request", None), "user", None) ra_helper.add_findings_to_risk_acceptance(user, instance, instance.accepted_findings.all()) + + # Add risk acceptance to engagement + if instance.accepted_findings.exists(): + engagement = instance.accepted_findings.first().test.engagement + engagement.risk_acceptance.add(instance) + return instance def update(self, instance, validated_data): @@ -1596,10 +1602,26 @@ def validate_findings_have_same_engagement(finding_objects: list[Finding]): raise PermissionDenied(msg) if self.context["request"].method == "POST": validate_findings_have_same_engagement(finding_objects) + + # Validate product allows full risk acceptance BEFORE creating instance + if finding_objects.exists(): + engagement = finding_objects.first().test.engagement + if not engagement.product.enable_full_risk_acceptance: + msg = "Full risk acceptance is not enabled for this product" + raise PermissionDenied(msg) elif self.context["request"].method in {"PATCH", "PUT"}: - existing_findings = Finding.objects.filter(risk_acceptance=self.instance.id) + # Use the reverse relation instead of filtering + existing_findings = self.instance.accepted_findings.all() existing_and_new_findings = existing_findings | finding_objects validate_findings_have_same_engagement(existing_and_new_findings) + + # Explicit check to prevent engagement switching + risk_acceptance_engagement = self.instance.engagement + if risk_acceptance_engagement and finding_objects.exists(): + new_findings_engagement = finding_objects.first().test.engagement + if risk_acceptance_engagement.id != new_findings_engagement.id: + msg = f"Risk Acceptance belongs to engagement {risk_acceptance_engagement.id}. Cannot add findings from engagement {new_findings_engagement.id}" + raise ValidationError(msg) return data class Meta: diff --git a/unittests/test_risk_acceptance_api.py b/unittests/test_risk_acceptance_api.py new file mode 100644 index 0000000000..45756bb6e5 --- /dev/null +++ b/unittests/test_risk_acceptance_api.py @@ -0,0 +1,360 @@ +import datetime + +from rest_framework.authtoken.models import Token +from rest_framework.reverse import reverse +from rest_framework.test import APIClient, APITestCase + +from dojo.authorization.roles_permissions import Roles +from dojo.models import ( + Engagement, + Finding, + Product, + Product_Type, + Product_Type_Member, + Risk_Acceptance, + Role, + Test, + Test_Type, + User, +) + + +class TestRiskAcceptanceApi(APITestCase): + + """ + Comprehensive API tests for Risk Acceptance creation and updates. + Tests edge cases, validations, and engagement linking. + """ + + @classmethod + def setUpTestData(cls): + # Create user with permissions + cls.user = User.objects.create(username="test_user", first_name="Test", last_name="User", is_staff=True) + cls.token = Token.objects.create(user=cls.user) + + # Create product type + cls.product_type = Product_Type.objects.create(name="Test Product Type") + Product_Type_Member.objects.create(product_type=cls.product_type, user=cls.user, role=Role.objects.get(id=Roles.Owner)) + + # Create product with full risk acceptance enabled + cls.product_enabled = Product.objects.create( + prod_type=cls.product_type, + name="Product with RA Enabled", + description="Test product with full risk acceptance enabled", + enable_full_risk_acceptance=True, + ) + + # Create product with full risk acceptance disabled + cls.product_disabled = Product.objects.create( + prod_type=cls.product_type, + name="Product with RA Disabled", + description="Test product with full risk acceptance disabled", + enable_full_risk_acceptance=False, + ) + + # Create engagements + cls.engagement_a = Engagement.objects.create( + product=cls.product_enabled, + target_start=datetime.datetime(2000, 1, 1, tzinfo=datetime.UTC), + target_end=datetime.datetime(2000, 2, 1, tzinfo=datetime.UTC), + name="Engagement A", + ) + + cls.engagement_b = Engagement.objects.create( + product=cls.product_enabled, + target_start=datetime.datetime(2000, 1, 1, tzinfo=datetime.UTC), + target_end=datetime.datetime(2000, 2, 1, tzinfo=datetime.UTC), + name="Engagement B", + ) + + cls.engagement_disabled = Engagement.objects.create( + product=cls.product_disabled, + target_start=datetime.datetime(2000, 1, 1, tzinfo=datetime.UTC), + target_end=datetime.datetime(2000, 2, 1, tzinfo=datetime.UTC), + name="Engagement Disabled", + ) + + # Create test type and tests + cls.test_type = Test_Type.objects.create(name="API Test Scan", static_tool=True) + + cls.test_a1 = Test.objects.create( + engagement=cls.engagement_a, + test_type=cls.test_type, + target_start=datetime.datetime(2000, 1, 1, tzinfo=datetime.UTC), + target_end=datetime.datetime(2000, 2, 1, tzinfo=datetime.UTC), + ) + + cls.test_b1 = Test.objects.create( + engagement=cls.engagement_b, + test_type=cls.test_type, + target_start=datetime.datetime(2000, 1, 1, tzinfo=datetime.UTC), + target_end=datetime.datetime(2000, 2, 1, tzinfo=datetime.UTC), + ) + + cls.test_disabled = Test.objects.create( + engagement=cls.engagement_disabled, + test_type=cls.test_type, + target_start=datetime.datetime(2000, 1, 1, tzinfo=datetime.UTC), + target_end=datetime.datetime(2000, 2, 1, tzinfo=datetime.UTC), + ) + + # Create findings + cls.finding_a1 = Finding.objects.create( + test=cls.test_a1, + title="Finding A1", + severity="High", + verified=True, + description="Test finding in engagement A", + reporter=cls.user, + numerical_severity="S1", + static_finding=True, + dynamic_finding=False, + ) + + cls.finding_a2 = Finding.objects.create( + test=cls.test_a1, + title="Finding A2", + severity="Medium", + verified=True, + description="Another test finding in engagement A", + reporter=cls.user, + numerical_severity="S2", + static_finding=True, + dynamic_finding=False, + ) + + cls.finding_a3 = Finding.objects.create( + test=cls.test_a1, + title="Finding A3", + severity="Low", + verified=True, + description="Third test finding in engagement A", + reporter=cls.user, + numerical_severity="S3", + static_finding=True, + dynamic_finding=False, + ) + + cls.finding_b1 = Finding.objects.create( + test=cls.test_b1, + title="Finding B1", + severity="High", + verified=True, + description="Test finding in engagement B", + reporter=cls.user, + numerical_severity="S1", + static_finding=True, + dynamic_finding=False, + ) + + cls.finding_b2 = Finding.objects.create( + test=cls.test_b1, + title="Finding B2", + severity="Medium", + verified=True, + description="Another test finding in engagement B", + reporter=cls.user, + numerical_severity="S2", + static_finding=True, + dynamic_finding=False, + ) + + cls.finding_disabled = Finding.objects.create( + test=cls.test_disabled, + title="Finding Disabled", + severity="High", + verified=True, + description="Test finding in disabled engagement", + reporter=cls.user, + numerical_severity="S1", + static_finding=True, + dynamic_finding=False, + ) + + def setUp(self): + self.client = APIClient() + self.client.credentials(HTTP_AUTHORIZATION="Token " + self.token.key) + self.url = reverse("risk_acceptance-list") + + def test_create_risk_acceptance_links_to_engagement(self): + """Test that risk acceptance created via API appears in engagement.risk_acceptance""" + payload = { + "name": "Test Risk Acceptance", + "recommendation": "A", + "recommendation_details": "Test recommendation", + "decision": "A", + "decision_details": "Test decision", + "accepted_by": "Test User", + "owner": self.user.id, + "expiration_date": "2025-12-31T23:59:59Z", + "reactivate_expired": True, + "restart_sla_expired": False, + "accepted_findings": [self.finding_a1.id, self.finding_a2.id], + } + + response = self.client.post(self.url, payload, format="json") + self.assertEqual(201, response.status_code, response.content) + + # Verify risk acceptance was created + ra_id = response.data["id"] + risk_acceptance = Risk_Acceptance.objects.get(id=ra_id) + + # Verify it's linked to the engagement + self.assertIn(risk_acceptance, self.engagement_a.risk_acceptance.all()) + self.assertEqual(1, self.engagement_a.risk_acceptance.count()) + + # Verify findings are marked as risk accepted + self.finding_a1.refresh_from_db() + self.finding_a2.refresh_from_db() + self.assertTrue(self.finding_a1.risk_accepted) + self.assertTrue(self.finding_a2.risk_accepted) + + def test_create_risk_acceptance_multiple_engagements_fails(self): + """Test that creating risk acceptance with findings from multiple engagements fails""" + payload = { + "name": "Invalid Risk Acceptance", + "recommendation": "A", + "decision": "A", + "accepted_by": "Test User", + "owner": self.user.id, + "accepted_findings": [self.finding_a1.id, self.finding_b1.id], # Different engagements! + } + + response = self.client.post(self.url, payload, format="json") + self.assertEqual(403, response.status_code, response.content) + self.assertIn("multiple engagements", str(response.data)) + + def test_create_risk_acceptance_product_disabled_fails(self): + """Test that creating risk acceptance fails when product has enable_full_risk_acceptance=False""" + payload = { + "name": "Disabled Product Risk Acceptance", + "recommendation": "A", + "decision": "A", + "accepted_by": "Test User", + "owner": self.user.id, + "accepted_findings": [self.finding_disabled.id], + } + + response = self.client.post(self.url, payload, format="json") + self.assertEqual(403, response.status_code, response.content) + self.assertIn("not enabled", str(response.data)) + + # Verify no risk acceptance was created + self.assertEqual(0, self.engagement_disabled.risk_acceptance.count()) + + def test_update_risk_acceptance_add_findings_same_engagement(self): + """Test that updating to add more findings from same engagement succeeds""" + # Create risk acceptance with one finding + ra = Risk_Acceptance.objects.create( + name="Initial RA", + recommendation="A", + decision="A", + accepted_by="Test User", + owner=self.user, + ) + ra.accepted_findings.add(self.finding_a1) + self.engagement_a.risk_acceptance.add(ra) + + # Update to add another finding from same engagement + payload = { + "name": "Updated RA", + "recommendation": "A", + "decision": "A", + "accepted_by": "Test User", + "owner": self.user.id, + "accepted_findings": [self.finding_a1.id, self.finding_a2.id], + } + + response = self.client.put(f"{self.url}{ra.id}/", payload, format="json") + self.assertEqual(200, response.status_code, response.content) + + # Verify both findings are now in the risk acceptance + ra.refresh_from_db() + self.assertEqual(2, ra.accepted_findings.count()) + self.assertIn(self.finding_a1, ra.accepted_findings.all()) + self.assertIn(self.finding_a2, ra.accepted_findings.all()) + + def test_update_risk_acceptance_switch_engagement_fails(self): + """Test that replacing all findings with findings from different engagement fails""" + # Create risk acceptance with findings from engagement A + ra = Risk_Acceptance.objects.create( + name="RA for Engagement A", + recommendation="A", + decision="A", + accepted_by="Test User", + owner=self.user, + ) + ra.accepted_findings.add(self.finding_a1) + self.engagement_a.risk_acceptance.add(ra) + + # Try to replace with finding from engagement B + payload = { + "name": "RA for Engagement A", + "recommendation": "A", + "decision": "A", + "accepted_by": "Test User", + "owner": self.user.id, + "accepted_findings": [self.finding_b1.id], # Different engagement! + } + + response = self.client.put(f"{self.url}{ra.id}/", payload, format="json") + # Returns 403 because the validate_findings_have_same_engagement check runs first + self.assertEqual(403, response.status_code, response.content) + self.assertIn("multiple engagements", str(response.data)) + + def test_update_risk_acceptance_empty_then_different_engagement_fails(self): + """Test edge case: risk acceptance with no findings cannot accept findings from different engagement""" + # Create risk acceptance with findings from engagement A, then remove them + ra = Risk_Acceptance.objects.create( + name="RA for Engagement A", + recommendation="A", + decision="A", + accepted_by="Test User", + owner=self.user, + ) + ra.accepted_findings.add(self.finding_a1) + self.engagement_a.risk_acceptance.add(ra) + + # Remove all findings (make it empty) + ra.accepted_findings.clear() + + # Try to add finding from engagement B + payload = { + "name": "RA for Engagement A", + "recommendation": "A", + "decision": "A", + "accepted_by": "Test User", + "owner": self.user.id, + "accepted_findings": [self.finding_b1.id], # Different engagement! + } + + response = self.client.put(f"{self.url}{ra.id}/", payload, format="json") + self.assertEqual(400, response.status_code, response.content) + self.assertIn("belongs to engagement", str(response.data)) + + def test_update_risk_acceptance_add_cross_engagement_fails(self): + """Test that adding findings from different engagement while keeping existing ones fails""" + # Create risk acceptance with finding from engagement A + ra = Risk_Acceptance.objects.create( + name="RA for Engagement A", + recommendation="A", + decision="A", + accepted_by="Test User", + owner=self.user, + ) + ra.accepted_findings.add(self.finding_a1) + self.engagement_a.risk_acceptance.add(ra) + + # Try to add findings from both engagements + payload = { + "name": "RA for Engagement A", + "recommendation": "A", + "decision": "A", + "accepted_by": "Test User", + "owner": self.user.id, + "accepted_findings": [self.finding_a1.id, self.finding_b1.id], # Mixed engagements! + } + + response = self.client.put(f"{self.url}{ra.id}/", payload, format="json") + self.assertEqual(403, response.status_code, response.content) + self.assertIn("multiple engagements", str(response.data)) From 26233e3497069135ecc3349ad7b9a750f280e536 Mon Sep 17 00:00:00 2001 From: Valentijn Scholten Date: Wed, 21 Jan 2026 19:33:04 +0100 Subject: [PATCH 2/3] fix permission check --- dojo/api_v2/serializers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dojo/api_v2/serializers.py b/dojo/api_v2/serializers.py index c78c81d4d5..71e9388482 100644 --- a/dojo/api_v2/serializers.py +++ b/dojo/api_v2/serializers.py @@ -1596,7 +1596,7 @@ def validate_findings_have_same_engagement(finding_objects: list[Finding]): findings = data.get("accepted_findings", []) findings_ids = [x.id for x in findings] finding_objects = Finding.objects.filter(id__in=findings_ids) - authed_findings = get_authorized_findings(Permissions.Finding_Edit).filter(id__in=findings_ids) + authed_findings = get_authorized_findings(Permissions.Risk_Acceptance).filter(id__in=findings_ids) if len(findings) != len(authed_findings): msg = "You are not permitted to add one or more selected findings to this risk acceptance" raise PermissionDenied(msg) From 889e3e62f10e18885d7bd354222edd4f777ae528 Mon Sep 17 00:00:00 2001 From: Valentijn Scholten Date: Wed, 21 Jan 2026 19:46:50 +0100 Subject: [PATCH 3/3] prevent orphaned RAs getting findings but no eng --- dojo/api_v2/serializers.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/dojo/api_v2/serializers.py b/dojo/api_v2/serializers.py index 71e9388482..09b34cab85 100644 --- a/dojo/api_v2/serializers.py +++ b/dojo/api_v2/serializers.py @@ -1537,6 +1537,7 @@ def create(self, validated_data): ra_helper.add_findings_to_risk_acceptance(user, instance, instance.accepted_findings.all()) # Add risk acceptance to engagement + # This is fine as Pro has its own model + relationshop to track links with engagements. if instance.accepted_findings.exists(): engagement = instance.accepted_findings.first().test.engagement engagement.risk_acceptance.add(instance) @@ -1560,6 +1561,13 @@ def update(self, instance, validated_data): # Remove the ones that were not present in the payload for finding in findings_to_remove: ra_helper.remove_finding_from_risk_acceptance(user, instance, finding) + + # Handle orphaned risk acceptances: link to engagement if it now has findings + # This is fine as Pro has its own model + relationshop to track links with engagements. + if instance.accepted_findings.exists() and not instance.engagement: + engagement = instance.accepted_findings.first().test.engagement + engagement.risk_acceptance.add(instance) + return instance @extend_schema_field(serializers.CharField())