diff --git a/vulnerabilities/importers/__init__.py b/vulnerabilities/importers/__init__.py index 82ee4525a..54b97adc2 100644 --- a/vulnerabilities/importers/__init__.py +++ b/vulnerabilities/importers/__init__.py @@ -47,6 +47,7 @@ from vulnerabilities.pipelines.v2_importers import ( elixir_security_importer as elixir_security_importer_v2, ) +from vulnerabilities.pipelines.v2_importers import euvd_importer as euvd_importer_v2 from vulnerabilities.pipelines.v2_importers import github_osv_importer as github_osv_importer_v2 from vulnerabilities.pipelines.v2_importers import gitlab_importer as gitlab_importer_v2 from vulnerabilities.pipelines.v2_importers import istio_importer as istio_importer_v2 @@ -75,6 +76,7 @@ pysec_importer_v2.PyPIImporterPipeline, xen_importer_v2.XenImporterPipeline, curl_importer_v2.CurlImporterPipeline, + euvd_importer_v2.EUVDImporterPipeline, oss_fuzz_v2.OSSFuzzImporterPipeline, istio_importer_v2.IstioImporterPipeline, postgresql_importer_v2.PostgreSQLImporterPipeline, diff --git a/vulnerabilities/pipelines/v2_importers/euvd_importer.py b/vulnerabilities/pipelines/v2_importers/euvd_importer.py new file mode 100644 index 000000000..0babfe060 --- /dev/null +++ b/vulnerabilities/pipelines/v2_importers/euvd_importer.py @@ -0,0 +1,244 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +import json +import logging +import math +import time +from datetime import datetime +from http import HTTPStatus +from typing import Iterable + +import requests +from dateutil import parser as dateparser + +from vulnerabilities.importer import AdvisoryData +from vulnerabilities.importer import ReferenceV2 +from vulnerabilities.importer import VulnerabilitySeverity +from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2 +from vulnerabilities.severity_systems import SCORING_SYSTEMS + +logger = logging.getLogger(__name__) + + +class EUVDImporterPipeline(VulnerableCodeBaseImporterPipelineV2): + """ + EUVD (EU Vulnerability Database) Importer Pipeline + + This pipeline imports security advisories from the European Union Vulnerability Database (EUVD). + """ + + pipeline_id = "euvd_importer_v2" + spdx_license_expression = "CC-BY-4.0" + license_url = "https://www.enisa.europa.eu/about-enisa/legal-notice/" + url = "https://euvdservices.enisa.europa.eu/api/search" + + def __init__(self): + super().__init__() + self._cached_data = None + + @classmethod + def steps(cls): + return (cls.collect_and_store_advisories,) + + def fetch_data(self): + if self._cached_data is not None: + logger.info(f"Using cached data: {len(self._cached_data)} items") + return self._cached_data + + all_items = [] + size = 100 + max_retries = 2 + + logger.info(f"Fetching data from EUVD API: {self.url}") + + total_count = self._fetch_total_count(size, max_retries) + if total_count is None: + logger.error("Failed to fetch total count from API") + return all_items + + total_pages = math.ceil(total_count / size) + logger.info(f"Total advisories: {total_count}, Total pages: {total_pages}") + + first_page_data = self._fetch_page(0, size, max_retries) + if first_page_data: + all_items.extend(first_page_data) + logger.info(f"Fetched page 0: {len(first_page_data)} items (total: {len(all_items)})") + + for page in range(1, total_pages): + page_data = self._fetch_page(page, size, max_retries) + if page_data is None: + logger.warning(f"Skipping page {page} after failed retries") + continue + + if not page_data: + logger.info(f"No items in response for page {page}; stopping fetch.") + break + + all_items.extend(page_data) + logger.info(f"Fetched page {page}: {len(page_data)} items (total: {len(all_items)})") + + logger.info(f"Fetch completed successfully. Total items collected: {len(all_items)}") + + self._cached_data = all_items + logger.info(f"Cached {len(all_items)} items for reuse") + + return all_items + + def _make_request_with_retry(self, params, max_retries, context): + headers = {"User-Agent": "VulnerableCode"} + + for attempt in range(max_retries): + try: + response = requests.get(self.url, headers=headers, params=params, timeout=30) + + if response.status_code != HTTPStatus.OK: + logger.error(f"API returned status {response.status_code} for {context}") + if attempt < max_retries - 1: + logger.info(f"Retrying {context} (attempt {attempt + 1}/{max_retries})") + time.sleep(3) + continue + return None + + return response.json() + + except requests.exceptions.Timeout: + logger.warning(f"Timeout on {context} (attempt {attempt + 1}/{max_retries})") + if attempt < max_retries - 1: + time.sleep(3) + continue + return None + + except requests.exceptions.RequestException as e: + logger.error( + f"Network error on {context}: {e} (attempt {attempt + 1}/{max_retries})" + ) + if attempt < max_retries - 1: + time.sleep(3) + continue + return None + + except (ValueError, KeyError) as e: + logger.error(f"Error parsing response for {context}: {e}") + return None + + return None + + def _fetch_total_count(self, size, max_retries): + """Fetch the total count of advisories from the API.""" + params = {"size": size, "page": 0} + data = self._make_request_with_retry(params, max_retries, "total count") + + if data is None: + return None + + total = data.get("total") + if total is None: + logger.error("No 'total' field in API response") + + return total + + def _fetch_page(self, page, size, max_retries): + """Fetch a single page of advisories from the API.""" + params = {"size": size, "page": page} + data = self._make_request_with_retry(params, max_retries, f"page {page}") + + if data is None: + return None + + return data.get("items", []) + + def advisories_count(self) -> int: + return len(self.fetch_data()) + + def collect_advisories(self) -> Iterable[AdvisoryData]: + for raw_data in self.fetch_data(): + try: + advisory = self.parse_advisory(raw_data) + if advisory: + yield advisory + except (ValueError, KeyError, TypeError) as e: + logger.error(f"Failed to parse advisory: {e}") + logger.debug(f"Raw data: {raw_data}") + continue + + def parse_advisory(self, raw_data: dict) -> AdvisoryData: + advisory_id = raw_data.get("id", "") + + aliases = [advisory_id] if advisory_id else [] + aliases_str = raw_data.get("aliases", "") + if aliases_str: + cve_aliases = [alias.strip() for alias in aliases_str.split("\n") if alias.strip()] + aliases.extend(cve_aliases) + + summary = raw_data.get("description", "") + + date_published = None + date_str = raw_data.get("datePublished", "") + if date_str: + try: + date_published = dateparser.parse(date_str) + if date_published and date_published.tzinfo is None: + date_published = date_published.replace( + tzinfo=datetime.now().astimezone().tzinfo + ) + except (ValueError, TypeError) as e: + logger.warning(f"Failed to parse date '{date_str}': {e}") + + references = [] + references_str = raw_data.get("references", "") + if references_str: + urls = [url.strip() for url in references_str.split("\n") if url.strip()] + for url in urls: + references.append(ReferenceV2(url=url)) + + if advisory_id: + advisory_url = f"https://euvd.enisa.europa.eu/vulnerability/{advisory_id}" + references.append(ReferenceV2(url=advisory_url)) + + severities = [] + base_score = raw_data.get("baseScore") + base_score_version = raw_data.get("baseScoreVersion") + base_score_vector = raw_data.get("baseScoreVector") + + if base_score and base_score_version: + scoring_system = self.get_scoring_system(base_score_version) + if scoring_system: + severity = VulnerabilitySeverity( + system=scoring_system, + value=str(base_score), + scoring_elements=base_score_vector or "", + ) + severities.append(severity) + + return AdvisoryData( + advisory_id=advisory_id, + aliases=aliases, + summary=summary, + references_v2=references, + affected_packages=[], + date_published=date_published, + url=advisory_url if advisory_id else "", + severities=severities, + original_advisory_text=json.dumps(raw_data, indent=2, ensure_ascii=False), + ) + + @staticmethod + def get_scoring_system(version: str): + version_map = { + "4.0": "cvssv4", + "3.1": "cvssv3.1", + "3.0": "cvssv3", + "2.0": "cvssv2", + } + system_key = version_map.get(version) + if system_key: + return SCORING_SYSTEMS.get(system_key) + logger.warning(f"Unknown CVSS version: {version}") + return None diff --git a/vulnerabilities/tests/pipelines/v2_importers/test_euvd_importer_v2.py b/vulnerabilities/tests/pipelines/v2_importers/test_euvd_importer_v2.py new file mode 100644 index 000000000..106b37cf0 --- /dev/null +++ b/vulnerabilities/tests/pipelines/v2_importers/test_euvd_importer_v2.py @@ -0,0 +1,78 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +import json +from pathlib import Path +from unittest import TestCase +from unittest.mock import Mock +from unittest.mock import patch + +from vulnerabilities.pipelines.v2_importers.euvd_importer import EUVDImporterPipeline +from vulnerabilities.tests import util_tests + +TEST_DATA = Path(__file__).parent.parent.parent / "test_data" / "euvd" + + +class TestEUVDImporterPipeline(TestCase): + @patch("vulnerabilities.pipelines.v2_importers.euvd_importer.requests.get") + def test_collect_advisories(self, mock_get): + """Test collecting and parsing advisories from test data""" + sample1_path = TEST_DATA / "euvd_sample1.json" + + sample1 = json.loads(sample1_path.read_text(encoding="utf-8")) + + mock_responses = [ + Mock(status_code=200, json=lambda: sample1), + Mock(status_code=200, json=lambda: sample1), + ] + mock_get.side_effect = mock_responses + + pipeline = EUVDImporterPipeline() + advisories = [data.to_dict() for data in list(pipeline.collect_advisories())] + + expected_file = TEST_DATA / "euvd-expected.json" + util_tests.check_results_against_json(advisories, expected_file) + + def test_get_scoring_system(self): + """Test CVSS version to scoring system mapping""" + pipeline = EUVDImporterPipeline() + + system_v4 = pipeline.get_scoring_system("4.0") + assert system_v4 is not None + assert system_v4.identifier == "cvssv4" + + system_v31 = pipeline.get_scoring_system("3.1") + assert system_v31 is not None + assert system_v31.identifier == "cvssv3.1" + + system_v3 = pipeline.get_scoring_system("3.0") + assert system_v3 is not None + assert system_v3.identifier == "cvssv3" + + system_v2 = pipeline.get_scoring_system("2.0") + assert system_v2 is not None + assert system_v2.identifier == "cvssv2" + + system_unknown = pipeline.get_scoring_system("unknown") + assert system_unknown is None + + @patch("vulnerabilities.pipelines.v2_importers.euvd_importer.requests.get") + def test_advisories_count(self, mock_get): + """Test counting advisories""" + sample_data = {"items": [{"id": "1"}, {"id": "2"}, {"id": "3"}], "total": 3} + mock_responses = [ + Mock(status_code=200, json=lambda: sample_data), + Mock(status_code=200, json=lambda: sample_data), + ] + mock_get.side_effect = mock_responses + + pipeline = EUVDImporterPipeline() + count = pipeline.advisories_count() + + assert count == 3 diff --git a/vulnerabilities/tests/test_data/euvd/euvd-expected.json b/vulnerabilities/tests/test_data/euvd/euvd-expected.json new file mode 100644 index 000000000..3a6dab3a8 --- /dev/null +++ b/vulnerabilities/tests/test_data/euvd/euvd-expected.json @@ -0,0 +1,116 @@ +[ + { + "advisory_id": "EUVD-2022-0569", + "aliases": [ + "EUVD-2022-0569", + "CVE-2018-1109", + "GHSA-cwfw-4gq5-mrqx" + ], + "summary": "A vulnerability was found in Braces versions 2.2.0 and above, prior to 2.3.1. Affected versions of this package are vulnerable to Regular Expression Denial of Service (ReDoS) attacks.", + "affected_packages": [], + "references_v2": [ + { + "reference_id": "", + "reference_type": "", + "url": "https://nvd.nist.gov/vuln/detail/CVE-2018-1109" + }, + { + "reference_id": "", + "reference_type": "", + "url": "https://github.com/micromatch/braces/commit/abdafb0cae1e0c00f184abbadc692f4eaa98f451" + }, + { + "reference_id": "", + "reference_type": "", + "url": "https://bugzilla.redhat.com/show_bug.cgi?id=1547272" + }, + { + "reference_id": "", + "reference_type": "", + "url": "https://euvd.enisa.europa.eu/vulnerability/EUVD-2022-0569" + } + ], + "severities": [ + { + "system": "cvssv3.1", + "value": "5.3", + "scoring_elements": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:N/I:N/A:L" + } + ], + "date_published": "2021-03-30T01:52:55+00:00", + "weaknesses": [], + "url": "https://euvd.enisa.europa.eu/vulnerability/EUVD-2022-0569" + }, + { + "advisory_id": "EUVD-2025-199883", + "aliases": [ + "EUVD-2025-199883", + "CVE-2025-66027" + ], + "summary": "Rallly is an open-source scheduling and collaboration tool. Prior to version 4.5.6, an information disclosure vulnerability exposes participant details, including names and email addresses through the /api/trpc/polls.get,polls.participants.list endpoint, even when Pro privacy features are enabled. This bypasses intended privacy controls that should prevent participants from viewing other users' personal information. This issue has been patched in version 4.5.6.", + "affected_packages": [], + "references_v2": [ + { + "reference_id": "", + "reference_type": "", + "url": "https://github.com/lukevella/rallly/security/advisories/GHSA-65wg-8xgw-f3fg" + }, + { + "reference_id": "", + "reference_type": "", + "url": "https://github.com/lukevella/rallly/commit/59738c04f9a8ec25f0af5ce20ad0eab6cf134963" + }, + { + "reference_id": "", + "reference_type": "", + "url": "https://euvd.enisa.europa.eu/vulnerability/EUVD-2025-199883" + } + ], + "severities": [ + { + "system": "cvssv4", + "value": "7.1", + "scoring_elements": "CVSS:4.0/AV:N/AC:L/AT:N/PR:L/UI:N/VC:H/VI:N/VA:N/SC:L/SI:N/SA:N" + } + ], + "date_published": "2025-11-29T00:43:02+00:00", + "weaknesses": [], + "url": "https://euvd.enisa.europa.eu/vulnerability/EUVD-2025-199883" + }, + { + "advisory_id": "EUVD-2025-199882", + "aliases": [ + "EUVD-2025-199882", + "CVE-2025-66034" + ], + "summary": "fontTools is a library for manipulating fonts, written in Python. In versions from 4.33.0 to before 4.60.2, the fonttools varLib (or python3 -m fontTools.varLib) script has an arbitrary file write vulnerability that leads to remote code execution when a malicious .designspace file is processed. The vulnerability affects the main() code path of fontTools.varLib, used by the fonttools varLib CLI and any code that invokes fontTools.varLib.main(). This issue has been patched in version 4.60.2.", + "affected_packages": [], + "references_v2": [ + { + "reference_id": "", + "reference_type": "", + "url": "https://github.com/fonttools/fonttools/security/advisories/GHSA-768j-98cg-p3fv" + }, + { + "reference_id": "", + "reference_type": "", + "url": "https://github.com/fonttools/fonttools/commit/a696d5ba93270d5954f98e7cab5ddca8a02c1e32" + }, + { + "reference_id": "", + "reference_type": "", + "url": "https://euvd.enisa.europa.eu/vulnerability/EUVD-2025-199882" + } + ], + "severities": [ + { + "system": "cvssv3.1", + "value": "6.3", + "scoring_elements": "CVSS:3.1/AV:L/AC:H/PR:N/UI:R/S:C/C:N/I:H/A:L" + } + ], + "date_published": "2025-11-29T01:07:12+00:00", + "weaknesses": [], + "url": "https://euvd.enisa.europa.eu/vulnerability/EUVD-2025-199882" + } +] diff --git a/vulnerabilities/tests/test_data/euvd/euvd_sample1.json b/vulnerabilities/tests/test_data/euvd/euvd_sample1.json new file mode 100644 index 000000000..87b493436 --- /dev/null +++ b/vulnerabilities/tests/test_data/euvd/euvd_sample1.json @@ -0,0 +1,47 @@ +{ + "items": [ + { + "id": "EUVD-2022-0569", + "enisaUuid": "ff8d275e-fded-32e0-b1dd-74cbae780c34", + "description": "A vulnerability was found in Braces versions 2.2.0 and above, prior to 2.3.1. Affected versions of this package are vulnerable to Regular Expression Denial of Service (ReDoS) attacks.", + "datePublished": "Mar 30, 2021, 1:52:55 AM", + "dateUpdated": "Dec 1, 2025, 2:18:10 PM", + "baseScore": 5.3, + "baseScoreVersion": "3.1", + "baseScoreVector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:N/I:N/A:L", + "references": "https://nvd.nist.gov/vuln/detail/CVE-2018-1109\nhttps://github.com/micromatch/braces/commit/abdafb0cae1e0c00f184abbadc692f4eaa98f451\nhttps://bugzilla.redhat.com/show_bug.cgi?id=1547272\n", + "aliases": "CVE-2018-1109\nGHSA-cwfw-4gq5-mrqx\n", + "assigner": "redhat", + "epss": 0.27 + }, + { + "id": "EUVD-2025-199883", + "enisaUuid": "227b1294-1b89-32c6-acd8-2cc50af9ea1c", + "description": "Rallly is an open-source scheduling and collaboration tool. Prior to version 4.5.6, an information disclosure vulnerability exposes participant details, including names and email addresses through the /api/trpc/polls.get,polls.participants.list endpoint, even when Pro privacy features are enabled. This bypasses intended privacy controls that should prevent participants from viewing other users' personal information. This issue has been patched in version 4.5.6.", + "datePublished": "Nov 29, 2025, 12:43:02 AM", + "dateUpdated": "Dec 1, 2025, 2:11:22 PM", + "baseScore": 7.1, + "baseScoreVersion": "4.0", + "baseScoreVector": "CVSS:4.0/AV:N/AC:L/AT:N/PR:L/UI:N/VC:H/VI:N/VA:N/SC:L/SI:N/SA:N", + "references": "https://github.com/lukevella/rallly/security/advisories/GHSA-65wg-8xgw-f3fg\nhttps://github.com/lukevella/rallly/commit/59738c04f9a8ec25f0af5ce20ad0eab6cf134963\n", + "aliases": "CVE-2025-66027\n", + "assigner": "GitHub_M", + "epss": 0.04 + }, + { + "id": "EUVD-2025-199882", + "enisaUuid": "911c0f6a-52a9-3013-a8e2-50ff32772b3f", + "description": "fontTools is a library for manipulating fonts, written in Python. In versions from 4.33.0 to before 4.60.2, the fonttools varLib (or python3 -m fontTools.varLib) script has an arbitrary file write vulnerability that leads to remote code execution when a malicious .designspace file is processed. The vulnerability affects the main() code path of fontTools.varLib, used by the fonttools varLib CLI and any code that invokes fontTools.varLib.main(). This issue has been patched in version 4.60.2.", + "datePublished": "Nov 29, 2025, 1:07:12 AM", + "dateUpdated": "Dec 1, 2025, 2:11:17 PM", + "baseScore": 6.3, + "baseScoreVersion": "3.1", + "baseScoreVector": "CVSS:3.1/AV:L/AC:H/PR:N/UI:R/S:C/C:N/I:H/A:L", + "references": "https://github.com/fonttools/fonttools/security/advisories/GHSA-768j-98cg-p3fv\nhttps://github.com/fonttools/fonttools/commit/a696d5ba93270d5954f98e7cab5ddca8a02c1e32\n", + "aliases": "CVE-2025-66034\n", + "assigner": "GitHub_M", + "epss": 0.09 + } + ], + "total": 5 +}