Skip to content

Commit 66dc954

Browse files
committed
Add tests
Signed-off-by: Tushar Goel <tushar.goel.dav@gmail.com>
1 parent 43d9ac4 commit 66dc954

File tree

3 files changed

+150
-11
lines changed

3 files changed

+150
-11
lines changed

vulnerabilities/improvers/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
from vulnerabilities.pipelines import enhance_with_exploitdb
1818
from vulnerabilities.pipelines import enhance_with_kev
1919
from vulnerabilities.pipelines import enhance_with_metasploit
20-
from vulnerabilities.pipelines import flag_ghost_packages
2120
from vulnerabilities.pipelines import fill_vulnerability_summary_pipeline
21+
from vulnerabilities.pipelines import flag_ghost_packages
2222

2323
IMPROVERS_REGISTRY = [
2424
valid_versions.GitHubBasicImprover,
Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
import logging
2-
from vulnerabilities.models import Vulnerability, Advisory
3-
from vulnerabilities.pipelines import VulnerableCodePipeline
2+
43
from django.db.models import Q
54

5+
from vulnerabilities.models import Advisory
6+
from vulnerabilities.models import Vulnerability
7+
from vulnerabilities.pipelines import VulnerableCodePipeline
8+
69

710
class FillVulnerabilitySummariesPipeline(VulnerableCodePipeline):
811
"""Pipeline to fill missing vulnerability summaries from advisories."""
@@ -16,27 +19,43 @@ def steps(cls):
1619
def fill_missing_summaries(self):
1720
"""Find vulnerabilities without summaries and fill them using advisories with the same aliases."""
1821
vulnerabilities_qs = Vulnerability.objects.filter(summary="").prefetch_related("aliases")
19-
self.log(f"Processing {vulnerabilities_qs.count()} vulnerabilities without summaries", level=logging.INFO)
20-
nvd_importer_advisories = Advisory.objects.filter(created_by="nvd_importer", summary__isnull=False).exclude(summary="")
21-
self.log(f"Found {nvd_importer_advisories.count()} advisories from NVD importer", level=logging.INFO)
22-
22+
self.log(
23+
f"Processing {vulnerabilities_qs.count()} vulnerabilities without summaries",
24+
level=logging.INFO,
25+
)
26+
nvd_importer_advisories = Advisory.objects.filter(
27+
created_by="nvd_importer", summary__isnull=False
28+
).exclude(summary="")
29+
self.log(
30+
f"Found {nvd_importer_advisories.count()} advisories from NVD importer",
31+
level=logging.INFO,
32+
)
33+
2334
for vulnerability in vulnerabilities_qs:
2435
aliases = vulnerability.aliases.values_list("alias", flat=True)
2536
alias = aliases.first()
2637

2738
# check if the vulnerability has an alias
2839
if not alias:
29-
self.log(f"Vulnerability {vulnerability.vulnerability_id} has no alias", level=logging.INFO)
40+
self.log(
41+
f"Vulnerability {vulnerability.vulnerability_id} has no alias",
42+
level=logging.INFO,
43+
)
3044
continue
3145

3246
# check if the vulnerability has an alias that matches an advisory
3347
matching_advisories = nvd_importer_advisories.filter(Q(aliases__contains=alias))
34-
48+
3549
if matching_advisories.exists():
36-
best_advisory = matching_advisories.first() # Take the first matching advisory with a summary
50+
best_advisory = (
51+
matching_advisories.first()
52+
) # Take the first matching advisory with a summary
3753
vulnerability.summary = best_advisory.summary
3854
vulnerability.save()
3955
if self.log:
40-
self.log(f"Updated summary for vulnerability {vulnerability.vulnerability_id}", level=logging.INFO)
56+
self.log(
57+
f"Updated summary for vulnerability {vulnerability.vulnerability_id}",
58+
level=logging.INFO,
59+
)
4160
else:
4261
self.log(f"No advisory found for alias {alias}", level=logging.INFO)
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
import datetime
11+
from pathlib import Path
12+
13+
import pytz
14+
from django.test import TestCase
15+
16+
from vulnerabilities.models import Advisory
17+
from vulnerabilities.models import Alias
18+
from vulnerabilities.models import Vulnerability
19+
from vulnerabilities.pipelines.fill_vulnerability_summary_pipeline import (
20+
FillVulnerabilitySummariesPipeline,
21+
)
22+
23+
24+
class FillVulnerabilitySummariesPipelineTest(TestCase):
25+
def setUp(self):
26+
self.data = Path(__file__).parent.parent / "test_data"
27+
28+
def test_fill_missing_summaries_from_nvd(self):
29+
"""
30+
Test that vulnerabilities without summaries get them from NVD advisories.
31+
"""
32+
33+
# Create a vulnerability without a summary
34+
vulnerability = Vulnerability.objects.create(
35+
vulnerability_id="VCID-1234",
36+
summary="",
37+
)
38+
alias = Alias.objects.create(alias="CVE-2024-1234", vulnerability=vulnerability)
39+
40+
# Create an NVD advisory with a summary
41+
Advisory.objects.create(
42+
summary="Test vulnerability summary",
43+
created_by="nvd_importer",
44+
date_collected=datetime.datetime(2024, 1, 1, tzinfo=pytz.UTC),
45+
aliases=["CVE-2024-1234"],
46+
)
47+
48+
# Run the pipeline
49+
pipeline = FillVulnerabilitySummariesPipeline()
50+
pipeline.fill_missing_summaries()
51+
52+
# Check that the vulnerability now has a summary
53+
vulnerability.refresh_from_db()
54+
self.assertEqual(vulnerability.summary, "Test vulnerability summary")
55+
56+
def test_no_matching_advisory(self):
57+
"""
58+
Test handling of vulnerabilities that have no matching NVD advisory.
59+
"""
60+
# Create a vulnerability without a summary
61+
vulnerability = Vulnerability.objects.create(
62+
vulnerability_id="VCID-1234",
63+
summary="",
64+
)
65+
Alias.objects.create(alias="CVE-2024-1234", vulnerability=vulnerability)
66+
67+
# Run the pipeline
68+
pipeline = FillVulnerabilitySummariesPipeline()
69+
pipeline.fill_missing_summaries()
70+
71+
# Check that the vulnerability still has no summary
72+
vulnerability.refresh_from_db()
73+
self.assertEqual(vulnerability.summary, "")
74+
75+
def test_vulnerability_without_alias(self):
76+
"""
77+
Test handling of vulnerabilities that have no aliases.
78+
"""
79+
80+
# Create a vulnerability without a summary or alias
81+
vulnerability = Vulnerability.objects.create(
82+
vulnerability_id="VCID-1234",
83+
summary="",
84+
)
85+
86+
# Run the pipeline
87+
pipeline = FillVulnerabilitySummariesPipeline()
88+
pipeline.fill_missing_summaries()
89+
90+
# Check that the vulnerability still has no summary
91+
vulnerability.refresh_from_db()
92+
self.assertEqual(vulnerability.summary, "")
93+
94+
def test_non_nvd_advisory_ignored(self):
95+
"""
96+
Test that advisories from sources other than NVD are ignored.
97+
"""
98+
99+
# Create a vulnerability without a summary
100+
vulnerability = Vulnerability.objects.create(
101+
vulnerability_id="VCID-1234",
102+
summary="",
103+
)
104+
alias = Alias.objects.create(alias="CVE-2024-1234", vulnerability=vulnerability)
105+
106+
# Create a non-NVD advisory with a summary
107+
Advisory.objects.create(
108+
summary="Test vulnerability summary",
109+
created_by="other_importer",
110+
date_collected=datetime.datetime(2024, 1, 1, tzinfo=pytz.UTC),
111+
aliases=["CVE-2024-1234"],
112+
)
113+
114+
# Run the pipeline
115+
pipeline = FillVulnerabilitySummariesPipeline()
116+
pipeline.fill_missing_summaries()
117+
118+
# Check that the vulnerability still has no summary
119+
vulnerability.refresh_from_db()
120+
self.assertEqual(vulnerability.summary, "")

0 commit comments

Comments
 (0)