Skip to content

Commit c6345fb

Browse files
committed
Add tests for Pypa importer
Signed-off-by: Tushar Goel <tushar.goel.dav@gmail.com>
1 parent a841280 commit c6345fb

File tree

3 files changed

+174
-3
lines changed

3 files changed

+174
-3
lines changed

vulnerabilities/pipelines/v2_importers/pypa_importer.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
from fetchcode.vcs import fetch_via_vcs
1515

1616
from vulnerabilities.importer import AdvisoryData
17-
from vulnerabilities.importers.osv import parse_advisory_data_v2
1817
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
1918
from vulnerabilities.utils import get_advisory_url
2019

@@ -46,7 +45,7 @@ def advisories_count(self):
4645
return sum(1 for _ in vulns_directory.rglob("*.yaml"))
4746

4847
def collect_advisories(self) -> Iterable[AdvisoryData]:
49-
from vulnerabilities.importers.osv import parse_advisory_data
48+
from vulnerabilities.importers.osv import parse_advisory_data_v2
5049

5150
base_directory = Path(self.vcs_response.dest_dir)
5251
vulns_directory = base_directory / "vulns"

vulnerabilities/tests/pipelines/test_apache_httpd_importer_pipeline_v2.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
from vulnerabilities.pipelines.v2_importers.apache_httpd_importer import ApacheHTTPDImporterPipeline
1515
from vulnerabilities.pipelines.v2_importers.apache_httpd_importer import fetch_links
1616
from vulnerabilities.pipelines.v2_importers.apache_httpd_importer import get_weaknesses
17-
from vulnerabilities.severity_systems import APACHE_HTTPD
1817

1918

2019
# Dummy responses
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
from unittest.mock import MagicMock
11+
from unittest.mock import patch
12+
13+
import pytest
14+
import saneyaml
15+
16+
from vulnerabilities.importer import AdvisoryData
17+
18+
19+
@pytest.fixture
20+
def mock_vcs_response():
21+
# Mock the vcs_response from fetch_via_vcs
22+
mock_response = MagicMock()
23+
mock_response.dest_dir = "/mock/repo"
24+
mock_response.delete = MagicMock()
25+
return mock_response
26+
27+
28+
@pytest.fixture
29+
def mock_fetch_via_vcs(mock_vcs_response):
30+
with patch("vulnerabilities.pipelines.v2_importers.pypa_importer.fetch_via_vcs") as mock:
31+
mock.return_value = mock_vcs_response
32+
yield mock
33+
34+
35+
@pytest.fixture
36+
def mock_pathlib(tmp_path):
37+
# Mock the Path structure to simulate the `vulns` directory and advisory files
38+
vulns_dir = tmp_path / "vulns"
39+
vulns_dir.mkdir()
40+
41+
advisory_file = vulns_dir / "CVE-2021-1234.yaml"
42+
advisory_file.write_text(
43+
"""
44+
id: CVE-2021-1234
45+
summary: Sample PyPI vulnerability
46+
references:
47+
- https://pypi.org/advisory/CVE-2021-1234
48+
"""
49+
)
50+
return vulns_dir
51+
52+
53+
def test_clone(mock_fetch_via_vcs, mock_vcs_response):
54+
# Import inside the test function to avoid circular import
55+
from vulnerabilities.pipelines.v2_importers.pypa_importer import PyPaImporterPipeline
56+
57+
# Test the `clone` method to ensure it calls `fetch_via_vcs`
58+
pipeline = PyPaImporterPipeline()
59+
pipeline.clone()
60+
61+
mock_fetch_via_vcs.assert_called_once_with(pipeline.repo_url)
62+
assert pipeline.vcs_response == mock_vcs_response
63+
64+
65+
def test_advisories_count(mock_pathlib, mock_vcs_response, mock_fetch_via_vcs):
66+
# Import inside the test function to avoid circular import
67+
from vulnerabilities.pipelines.v2_importers.pypa_importer import PyPaImporterPipeline
68+
69+
# Mock `vcs_response.dest_dir` to point to the temporary directory
70+
mock_vcs_response.dest_dir = str(mock_pathlib.parent)
71+
72+
pipeline = PyPaImporterPipeline()
73+
74+
# Call clone() to set the vcs_response attribute
75+
pipeline.clone()
76+
mock_fetch_via_vcs.assert_called_once_with(pipeline.repo_url)
77+
78+
count = pipeline.advisories_count()
79+
80+
# Check that the count matches the number of YAML files in the `vulns` directory
81+
assert count == 1
82+
83+
84+
def test_collect_advisories(mock_pathlib, mock_vcs_response, mock_fetch_via_vcs):
85+
# Import inside the test function to avoid circular import
86+
from vulnerabilities.pipelines.v2_importers.pypa_importer import PyPaImporterPipeline
87+
88+
# Mock `vcs_response.dest_dir` to point to the temporary directory
89+
mock_vcs_response.dest_dir = str(mock_pathlib.parent)
90+
91+
# Mock `parse_advisory_data` to return an AdvisoryData object
92+
with patch("vulnerabilities.importers.osv.parse_advisory_data_v2") as mock_parse:
93+
mock_parse.return_value = AdvisoryData(
94+
advisory_id="CVE-2021-1234",
95+
summary="Sample PyPI vulnerability",
96+
references_v2=[{"url": "https://pypi.org/advisory/CVE-2021-1234"}],
97+
affected_packages=[],
98+
weaknesses=[],
99+
url="https://pypi.org/advisory/CVE-2021-1234",
100+
)
101+
102+
pipeline = PyPaImporterPipeline()
103+
pipeline.clone()
104+
mock_fetch_via_vcs.assert_called_once_with(pipeline.repo_url)
105+
advisories = list(pipeline.collect_advisories())
106+
107+
# Ensure that advisories are parsed correctly
108+
assert len(advisories) == 1
109+
advisory = advisories[0]
110+
assert advisory.advisory_id == "CVE-2021-1234"
111+
assert advisory.summary == "Sample PyPI vulnerability"
112+
assert advisory.url == "https://pypi.org/advisory/CVE-2021-1234"
113+
114+
115+
def test_clean_downloads(mock_vcs_response):
116+
# Import inside the test function to avoid circular import
117+
from vulnerabilities.pipelines.v2_importers.pypa_importer import PyPaImporterPipeline
118+
119+
# Test the `clean_downloads` method to ensure the repository is deleted
120+
pipeline = PyPaImporterPipeline()
121+
pipeline.vcs_response = mock_vcs_response
122+
123+
pipeline.clean_downloads()
124+
125+
mock_vcs_response.delete.assert_called_once()
126+
127+
128+
def test_on_failure(mock_vcs_response):
129+
# Import inside the test function to avoid circular import
130+
from vulnerabilities.pipelines.v2_importers.pypa_importer import PyPaImporterPipeline
131+
132+
# Test the `on_failure` method to ensure `clean_downloads` is called on failure
133+
pipeline = PyPaImporterPipeline()
134+
pipeline.vcs_response = mock_vcs_response
135+
136+
with patch.object(pipeline, "clean_downloads") as mock_clean:
137+
pipeline.on_failure()
138+
139+
mock_clean.assert_called_once()
140+
141+
142+
def test_collect_advisories_with_invalid_yaml(mock_pathlib, mock_vcs_response, mock_fetch_via_vcs):
143+
# Import inside the test function to avoid circular import
144+
from vulnerabilities.pipelines.v2_importers.pypa_importer import PyPaImporterPipeline
145+
146+
# Create an invalid YAML file
147+
invalid_file = mock_pathlib / "invalid_file.yaml"
148+
invalid_file.write_text("invalid_yaml")
149+
150+
mock_vcs_response.dest_dir = str(mock_pathlib.parent)
151+
152+
with patch("vulnerabilities.importers.osv.parse_advisory_data_v2") as mock_parse:
153+
# Mock parse_advisory_data to raise an error on invalid YAML
154+
mock_parse.side_effect = saneyaml.YAMLError("Invalid YAML")
155+
156+
pipeline = PyPaImporterPipeline()
157+
pipeline.clone()
158+
mock_fetch_via_vcs.assert_called_once_with(pipeline.repo_url)
159+
with pytest.raises(saneyaml.YAMLError):
160+
list(pipeline.collect_advisories())
161+
162+
163+
def test_advisories_count_empty(mock_vcs_response, mock_fetch_via_vcs):
164+
# Import inside the test function to avoid circular import
165+
from vulnerabilities.pipelines.v2_importers.pypa_importer import PyPaImporterPipeline
166+
167+
# Mock an empty 'vulns' directory
168+
mock_vcs_response.dest_dir = "/mock/empty_repo"
169+
pipeline = PyPaImporterPipeline()
170+
pipeline.clone()
171+
# Test that advisories_count returns 0 for an empty directory
172+
count = pipeline.advisories_count()
173+
assert count == 0

0 commit comments

Comments
 (0)