Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from vulnerabilities.pipelines.v2_importers import apache_tomcat_importer as apache_tomcat_v2
from vulnerabilities.pipelines.v2_importers import archlinux_importer as archlinux_importer_v2
from vulnerabilities.pipelines.v2_importers import curl_importer as curl_importer_v2
from vulnerabilities.pipelines.v2_importers import debian_importer as debian_importer_v2
from vulnerabilities.pipelines.v2_importers import (
elixir_security_importer as elixir_security_importer_v2,
)
Expand Down Expand Up @@ -103,6 +104,7 @@
ruby_importer_v2.RubyImporterPipeline,
epss_importer_v2.EPSSImporterPipeline,
nginx_importer_v2.NginxImporterPipeline,
debian_importer_v2.DebianImporterPipeline,
mattermost_importer_v2.MattermostImporterPipeline,
apache_tomcat_v2.ApacheTomcatImporterPipeline,
nvd_importer.NVDImporterPipeline,
Expand Down
187 changes: 187 additions & 0 deletions vulnerabilities/pipelines/v2_importers/debian_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import re
from typing import Any
from typing import Iterable
from typing import Mapping

from packageurl import PackageURL
from univers.version_range import DebianVersionRange

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import AffectedPackageV2
from vulnerabilities.importer import ReferenceV2
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
from vulnerabilities.utils import create_weaknesses_list
from vulnerabilities.utils import dedupe
from vulnerabilities.utils import fetch_response
from vulnerabilities.utils import get_item


class DebianImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
"""Debian Importer Pipeline"""

pipeline_id = "debian_importer_v2"
spdx_license_expression = "LicenseRef-scancode-other-permissive"
license_url = "https://www.debian.org/license"
notice = """
From: Tushar Goel <tgoel@nexb.com>
Date: Thu, May 12, 2022 at 11:42 PM +00:00
Subject: Usage of Debian Security Data in VulnerableCode
To: <team@security.debian.org>
Hey,
We would like to integrate the debian security data in vulnerablecode
[1][2] which is a FOSS db of FOSS vulnerability data. We were not able
to know under which license the debian security data comes. We would
be grateful to have your acknowledgement over usage of the debian
security data in vulnerablecode and have some kind of licensing
declaration from your side.
[1] - https://github.com/nexB/vulnerablecode
[2] - https://github.com/nexB/vulnerablecode/pull/723
Regards,
From: Moritz Mühlenhoff <jmm@inutil.org>
Date: Wed, May 17, 2022, 19:12 PM +00:00
Subject: Re: Usage of Debian Security Data in VulnerableCode
To: Tushar Goel <tgoel@nexb.com>
Cc: <team@security.debian.org>
Am Thu, May 12, 2022 at 05:12:48PM +0530 schrieb Tushar Goel:
> Hey,
>
> We would like to integrate the debian security data in vulnerablecode
> [1][2] which is a FOSS db of FOSS vulnerability data. We were not able
> to know under which license the debian security data comes. We would
> be grateful to have your acknowledgement over usage of the debian
> security data in vulnerablecode and have some kind of licensing
> declaration from your side.
We don't have a specific license, but you have our endorsemen to
reuse the data by all means :-)
Cheers,
Moritz
"""

api_url = "https://security-tracker.debian.org/tracker/data/json"
response = None

@classmethod
def steps(cls):
return (cls.collect_and_store_advisories,)

def get_response(self):
try:
response = fetch_response(self.api_url)
if response:
return response.json()
return {}
except Exception as e:
self.log(f"Error fetching data from {self.api_url!r}: {e}")
return {}

def advisories_count(self) -> int:
adv_count = 0
if not self.response:
self.response = self.get_response()
for pkg in self.response:
recs = len(self.response[pkg])
adv_count += recs
return adv_count

def collect_advisories(self) -> Iterable[AdvisoryData]:
if not self.response:
self.response = self.get_response()
for pkg_name, records in self.response.items():
yield from self.parse(pkg_name, records)

def parse(self, pkg_name: str, records: Mapping[str, Any]) -> Iterable[AdvisoryData]:

for record_identifier, record in records.items():
affected_versions = []
fixed_versions = []

releases = record["releases"].items()
for release_name, release_record in releases:
version = get_item(release_record, "repositories", release_name)

if not version:
self.log(
f"Version not found for {release_name} in {record} in package {pkg_name}"
)
continue

purl = PackageURL(
name=pkg_name,
type="deb",
namespace="debian",
qualifiers={"distro": release_name},
)

if release_record.get("status", "") == "resolved":
fixed_versions.append(version)
else:
affected_versions.append(version)

if release_record.get("fixed_version"):
fixed_versions.append(release_record["fixed_version"])

references = []
debianbug = record.get("debianbug")
if debianbug:
bug_url = f"https://bugs.debian.org/cgi-bin/bugreport.cgi?bug={debianbug}"
references.append(ReferenceV2(url=bug_url, reference_id=str(debianbug)))
affected_versions = dedupe(affected_versions)
fixed_versions = dedupe(fixed_versions)
if affected_versions:
affected_version_range = DebianVersionRange.from_versions(affected_versions)
else:
affected_version_range = None
affected_packages = []
for fixed_version in fixed_versions:
affected_packages.append(
AffectedPackageV2(
package=purl,
affected_version_range=affected_version_range,
fixed_version_range=DebianVersionRange.from_versions([fixed_version]),
)
)
weaknesses = get_cwe_from_debian_advisory(record)

yield AdvisoryData(
advisory_id=f"{pkg_name}/{record_identifier}",
aliases=[record_identifier],
summary=record.get("description", ""),
affected_packages=affected_packages,
references=references,
weaknesses=weaknesses,
url=f"https://security-tracker.debian.org/tracker/{record_identifier}",
)


def get_cwe_from_debian_advisory(record):
"""
Extracts CWE ID strings from the given raw_data and returns a list of CWE IDs.
>>> get_cwe_from_debian_advisory({"description":"PEAR HTML_QuickForm version 3.2.14 contains an eval injection (CWE-95) vulnerability in HTML_QuickForm's getSubmitValue method, HTML_QuickForm's validate method, HTML_QuickForm_hierselect's _setOptions method, HTML_QuickForm_element's _findValue method, HTML_QuickForm_element's _prepareValue method. that can result in Possible information disclosure, possible impact on data integrity and execution of arbitrary code. This attack appear to be exploitable via A specially crafted query string could be utilised, e.g. http://www.example.com/admin/add_practice_type_id[1]=fubar%27])%20OR%20die(%27OOK!%27);%20//&mode=live. This vulnerability appears to have been fixed in 3.2.15."})
[95]
>>> get_cwe_from_debian_advisory({"description":"There is no WEAKNESS DATA"})
[]
"""
description = record.get("description") or ""
pattern = r"CWE-\d+"
cwe_strings = re.findall(pattern, description)
weaknesses = create_weaknesses_list(cwe_strings)
return weaknesses
136 changes: 136 additions & 0 deletions vulnerabilities/tests/pipelines/v2_importers/test_debian_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

from unittest.mock import MagicMock
from unittest.mock import patch

import pytest
from packageurl import PackageURL
from univers.version_range import DebianVersionRange

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import AffectedPackageV2
from vulnerabilities.importer import ReferenceV2
from vulnerabilities.pipelines.v2_importers.debian_importer import DebianImporterPipeline
from vulnerabilities.pipelines.v2_importers.debian_importer import get_cwe_from_debian_advisory


@pytest.fixture
def importer():
return DebianImporterPipeline()


@pytest.fixture
def sample_response():
return {
"openssl": {
"CVE-2023-1234": {
"description": "Some vulnerability description (CWE-79)",
"debianbug": 123456,
"releases": {
"bullseye": {
"status": "resolved",
"repositories": {"bullseye": "1.1.1k-1"},
"fixed_version": "1.1.1k-2",
},
"bookworm": {
"status": "open",
"repositories": {"bookworm": "1.1.1l-1"},
},
},
}
}
}


def test_get_cwe_from_debian_advisory_with_cwe():
record = {"description": "This issue relates to improper input validation (CWE-20)."}

weaknesses = get_cwe_from_debian_advisory(record)

assert len(weaknesses) == 1
assert weaknesses[0] == 20


def test_get_cwe_from_debian_advisory_without_cwe():
record = {"description": "No weakness mentioned here."}

weaknesses = get_cwe_from_debian_advisory(record)

assert weaknesses == []


@patch("vulnerabilities.pipelines.v2_importers.debian_importer.fetch_response")
def test_get_response_success(mock_fetch, importer, sample_response):
mock_resp = MagicMock()
mock_resp.json.return_value = sample_response
mock_fetch.return_value = mock_resp

response = importer.get_response()

assert response == sample_response
mock_fetch.assert_called_once_with(importer.api_url)


@patch("vulnerabilities.pipelines.v2_importers.debian_importer.fetch_response")
def test_get_response_failure(mock_fetch, importer):
mock_fetch.side_effect = Exception("network error")

response = importer.get_response()

assert response == {}


def test_advisories_count(importer, sample_response):
importer.response = sample_response

count = importer.advisories_count()

assert count == 1


def test_collect_advisories(importer, sample_response):
importer.response = sample_response

advisories = list(importer.collect_advisories())

assert len(advisories) == 1
advisory = advisories[0]

assert isinstance(advisory, AdvisoryData)
assert advisory.advisory_id == "openssl/CVE-2023-1234"
assert advisory.summary.startswith("Some vulnerability")


def test_affected_packages_generation(importer, sample_response):
importer.response = sample_response

advisory = next(importer.collect_advisories())
affected_packages = advisory.affected_packages

assert len(affected_packages) == 2

for pkg in affected_packages:
assert isinstance(pkg, AffectedPackageV2)
assert isinstance(pkg.package, PackageURL)
assert isinstance(pkg.fixed_version_range, DebianVersionRange)


def test_debian_bug_reference(importer, sample_response):
importer.response = sample_response

advisory = next(importer.collect_advisories())
references = advisory.references

assert len(references) == 1
ref = references[0]

assert isinstance(ref, ReferenceV2)
assert ref.reference_id == "123456"
assert "bugs.debian.org" in ref.url
14 changes: 10 additions & 4 deletions vulnerabilities/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,10 +389,16 @@ def fetch_response(url):
"""
Fetch and return `response` from the `url`
"""
response = requests.get(url)
if response.status_code == HTTPStatus.OK:
return response
raise Exception(f"Failed to fetch data from {url!r} with status code: {response.status_code!r}")
try:
response = requests.get(url)
if response.status_code == HTTPStatus.OK:
return response
raise Exception(
f"Failed to fetch data from {url!r} with status code: {response.status_code!r}"
)
except Exception as e:
logger.error(f"Error fetching data from {url!r}: {e}")
return None


# This should be a method on PackageURL
Expand Down